perf: bench merkle stage (#1497)

This commit is contained in:
joshieDo
2023-03-01 14:20:00 +08:00
committed by GitHub
parent 4839adeaef
commit 2884eae075
14 changed files with 766 additions and 280 deletions

View File

@@ -0,0 +1,137 @@
use crate::{
db::DbTool,
dirs::{DbPath, PlatformPath},
dump_stage::setup,
};
use eyre::Result;
use reth_db::{database::Database, table::TableImporter, tables, transaction::DbTx};
use reth_primitives::MAINNET;
use reth_provider::Transaction;
use reth_stages::{
stages::{AccountHashingStage, ExecutionStage, MerkleStage, StorageHashingStage},
DefaultDB, Stage, StageId, UnwindInput,
};
use std::ops::DerefMut;
use tracing::info;
pub(crate) async fn dump_merkle_stage<DB: Database>(
db_tool: &mut DbTool<'_, DB>,
from: u64,
to: u64,
output_db: &PlatformPath<DbPath>,
should_run: bool,
) -> Result<()> {
let (output_db, tip_block_number) = setup::<DB>(from, to, output_db, db_tool)?;
output_db.update(|tx| {
tx.import_table_with_range::<tables::Headers, _>(&db_tool.db.tx()?, Some(from), to)
})??;
let tx = db_tool.db.tx()?;
let from_transition_rev =
tx.get::<tables::BlockTransitionIndex>(from)?.expect("there should be at least one.");
let to_transition_rev =
tx.get::<tables::BlockTransitionIndex>(to)?.expect("there should be at least one.");
output_db.update(|tx| {
tx.import_table_with_range::<tables::AccountChangeSet, _>(
&db_tool.db.tx()?,
Some(from_transition_rev),
to_transition_rev,
)
})??;
unwind_and_copy::<DB>(db_tool, (from, to), tip_block_number, &output_db).await?;
if should_run {
println!(
"\n# Merkle stage does not support dry run, so it will actually be committing changes."
);
run(output_db, to, from).await?;
}
Ok(())
}
/// Dry-run an unwind to FROM block and copy the necessary table data to the new database.
async fn unwind_and_copy<DB: Database>(
db_tool: &mut DbTool<'_, DB>,
range: (u64, u64),
tip_block_number: u64,
output_db: &reth_db::mdbx::Env<reth_db::mdbx::WriteMap>,
) -> eyre::Result<()> {
let (from, to) = range;
let mut unwind_tx = Transaction::new(db_tool.db)?;
let unwind = UnwindInput { unwind_to: from, stage_progress: tip_block_number, bad_block: None };
let execute_input = reth_stages::ExecInput {
previous_stage: Some((StageId("Another"), to)),
stage_progress: Some(from),
};
// Unwind hashes all the way to FROM
StorageHashingStage::default().unwind(&mut unwind_tx, unwind).await.unwrap();
AccountHashingStage::default().unwind(&mut unwind_tx, unwind).await.unwrap();
MerkleStage::default_unwind().unwind(&mut unwind_tx, unwind).await?;
// Bring Plainstate to TO (hashing stage execution requires it)
let mut exec_stage: ExecutionStage<'_, DefaultDB<'_>> = ExecutionStage::from(MAINNET.clone());
exec_stage.commit_threshold = u64::MAX;
exec_stage
.unwind(
&mut unwind_tx,
UnwindInput { unwind_to: to, stage_progress: tip_block_number, bad_block: None },
)
.await?;
// Bring hashes to TO
AccountHashingStage { clean_threshold: u64::MAX, commit_threshold: u64::MAX }
.execute(&mut unwind_tx, execute_input)
.await
.unwrap();
StorageHashingStage { clean_threshold: u64::MAX, commit_threshold: u64::MAX }
.execute(&mut unwind_tx, execute_input)
.await
.unwrap();
let unwind_inner_tx = unwind_tx.deref_mut();
// TODO optimize we can actually just get the entries we need
output_db.update(|tx| tx.import_dupsort::<tables::StorageChangeSet, _>(unwind_inner_tx))??;
output_db.update(|tx| tx.import_table::<tables::HashedAccount, _>(unwind_inner_tx))??;
output_db.update(|tx| tx.import_dupsort::<tables::HashedStorage, _>(unwind_inner_tx))??;
output_db.update(|tx| tx.import_table::<tables::AccountsTrie, _>(unwind_inner_tx))??;
output_db.update(|tx| tx.import_dupsort::<tables::StoragesTrie, _>(unwind_inner_tx))??;
unwind_tx.drop()?;
Ok(())
}
/// Try to re-execute the stage straightaway
async fn run(
output_db: reth_db::mdbx::Env<reth_db::mdbx::WriteMap>,
to: u64,
from: u64,
) -> eyre::Result<()> {
info!(target: "reth::cli", "Executing stage.");
let mut tx = Transaction::new(&output_db)?;
MerkleStage::Execution {
clean_threshold: u64::MAX, // Forces updating the root instead of calculating from scratch
}
.execute(
&mut tx,
reth_stages::ExecInput {
previous_stage: Some((StageId("Another"), to)),
stage_progress: Some(from),
},
)
.await?;
info!(target: "reth::cli", "Success.");
Ok(())
}

View File

@@ -8,6 +8,9 @@ use hashing_account::dump_hashing_account_stage;
mod execution;
use execution::dump_execution_stage;
mod merkle;
use merkle::dump_merkle_stage;
use crate::{
db::DbTool,
dirs::{DbPath, PlatformPath},
@@ -45,6 +48,8 @@ pub enum Stages {
StorageHashing(StageCommand),
/// AccountHashing stage.
AccountHashing(StageCommand),
/// Merkle stage.
Merkle(StageCommand),
}
/// Stage command that takes a range
@@ -94,6 +99,9 @@ impl Command {
Stages::AccountHashing(StageCommand { output_db, from, to, dry_run, .. }) => {
dump_hashing_account_stage(&mut tool, *from, *to, output_db, *dry_run).await?
}
Stages::Merkle(StageCommand { output_db, from, to, dry_run, .. }) => {
dump_merkle_stage(&mut tool, *from, *to, output_db, *dry_run).await?
}
}
Ok(())

View File

@@ -10,7 +10,7 @@ readme = "README.md"
reth-codecs = { path = "../storage/codecs" }
reth-primitives = { path = "../primitives" }
reth-rpc-types = { path = "../rpc/rpc-types" }
reth-network-api = { path = "../net/network-api"}
reth-network-api = { path = "../net/network-api" }
revm-primitives = "1.0"
async-trait = "0.1.57"
thiserror = "1.0.37"
@@ -26,16 +26,24 @@ futures = "0.3"
tokio-stream = "0.1.11"
rand = "0.8.5"
arbitrary = { version = "1.1.7", features = ["derive"], optional = true }
secp256k1 = { version = "0.24.2", default-features = false, features = ["alloc", "recovery", "rand"], optional = true }
secp256k1 = { version = "0.24.2", default-features = false, features = [
"alloc",
"recovery",
"rand",
], optional = true }
modular-bitfield = "0.11.2"
[dev-dependencies]
reth-db = { path = "../storage/db", features = ["test-utils"] }
tokio = { version = "1.21.2", features = ["full"] }
tokio-stream = { version = "0.1.11", features = ["sync"] }
arbitrary = { version = "1.1.7", features = ["derive"]}
arbitrary = { version = "1.1.7", features = ["derive"] }
hex-literal = "0.3"
secp256k1 = { version = "0.24.2", default-features = false, features = ["alloc", "recovery", "rand"] }
secp256k1 = { version = "0.24.2", default-features = false, features = [
"alloc",
"recovery",
"rand",
] }
[features]
bench = []

View File

@@ -1,9 +1,10 @@
use rand::{distributions::uniform::SampleRange, thread_rng, Rng};
use rand::{distributions::uniform::SampleRange, seq::SliceRandom, thread_rng, Rng};
use reth_primitives::{
proofs, Account, Address, Bytes, Header, SealedBlock, SealedHeader, Signature, Transaction,
TransactionKind, TransactionSigned, TxLegacy, H160, H256, U256,
proofs, Account, Address, Bytes, Header, SealedBlock, SealedHeader, Signature, StorageEntry,
Transaction, TransactionKind, TransactionSigned, TxLegacy, H160, H256, U256,
};
use secp256k1::{KeyPair, Message as SecpMessage, Secp256k1, SecretKey};
use std::{collections::BTreeMap, ops::Sub};
// TODO(onbjerg): Maybe we should split this off to its own crate, or move the helpers to the
// relevant crates?
@@ -165,6 +166,115 @@ pub fn random_block_range(
blocks
}
type Transition = Vec<(Address, Account, Vec<StorageEntry>)>;
type AccountState = (Account, Vec<StorageEntry>);
/// Generate a range of transitions for given blocks and accounts.
/// Assumes all accounts start with an empty storage.
///
/// Returns a Vec of account and storage changes for each transition,
/// along with the final state of all accounts and storages.
pub fn random_transition_range<'a, IBlk, IAcc>(
blocks: IBlk,
accounts: IAcc,
n_changes: std::ops::Range<u64>,
key_range: std::ops::Range<u64>,
) -> (Vec<Transition>, BTreeMap<Address, AccountState>)
where
IBlk: IntoIterator<Item = &'a SealedBlock>,
IAcc: IntoIterator<Item = (Address, (Account, Vec<StorageEntry>))>,
{
let mut rng = rand::thread_rng();
let mut state: BTreeMap<_, _> = accounts
.into_iter()
.map(|(addr, (acc, st))| (addr, (acc, st.into_iter().map(|e| (e.key, e.value)).collect())))
.collect();
let valid_addresses = state.keys().copied().collect();
let num_transitions: usize = blocks.into_iter().map(|block| block.body.len()).sum();
let mut transitions = Vec::with_capacity(num_transitions);
(0..num_transitions).for_each(|i| {
let mut transition = Vec::new();
let (from, to, mut transfer, new_entries) =
random_account_change(&valid_addresses, n_changes.clone(), key_range.clone());
// extract from sending account
let (prev_from, _) = state.get_mut(&from).unwrap();
transition.push((from, *prev_from, Vec::new()));
transfer = transfer.min(prev_from.balance).max(U256::from(1));
prev_from.balance = prev_from.balance.wrapping_sub(transfer);
// deposit in receiving account and update storage
let (prev_to, storage): &mut (Account, BTreeMap<H256, U256>) = state.get_mut(&to).unwrap();
let old_entries = new_entries
.into_iter()
.filter_map(|entry| {
let old = if entry.value != U256::ZERO {
storage.insert(entry.key, entry.value)
} else {
let old = storage.remove(&entry.key);
if matches!(old, Some(U256::ZERO)) {
return None
}
old
};
Some(StorageEntry { value: old.unwrap_or(U256::from(0)), ..entry })
})
.collect();
transition.push((to, *prev_to, old_entries));
prev_to.balance = prev_to.balance.wrapping_add(transfer);
transitions.push(transition);
});
let final_state = state
.into_iter()
.map(|(addr, (acc, storage))| {
(addr, (acc, storage.into_iter().map(|v| v.into()).collect()))
})
.collect();
(transitions, final_state)
}
/// Generate a random account change.
///
/// Returns two addresses, a balance_change, and a Vec of new storage entries.
pub fn random_account_change(
valid_addresses: &Vec<Address>,
n_changes: std::ops::Range<u64>,
key_range: std::ops::Range<u64>,
) -> (Address, Address, U256, Vec<StorageEntry>) {
let mut rng = rand::thread_rng();
let mut addresses = valid_addresses.choose_multiple(&mut rng, 2).cloned();
let addr_from = addresses.next().unwrap_or_else(Address::random);
let addr_to = addresses.next().unwrap_or_else(Address::random);
let balance_change = U256::from(rng.gen::<u64>());
let storage_changes = (0..n_changes.sample_single(&mut rng))
.map(|_| random_storage_entry(key_range.clone()))
.collect();
(addr_from, addr_to, balance_change, storage_changes)
}
/// Generate a random storage change.
pub fn random_storage_entry(key_range: std::ops::Range<u64>) -> StorageEntry {
let mut rng = rand::thread_rng();
let key = H256::from_low_u64_be(key_range.sample_single(&mut rng));
let value = U256::from(rng.gen::<u64>());
StorageEntry { key, value }
}
/// Generate random Externaly Owned Account (EOA account without contract).
pub fn random_eoa_account() -> (Address, Account) {
let nonce: u64 = rand::random();

View File

@@ -12,6 +12,12 @@ pub struct StorageEntry {
pub value: U256,
}
impl From<(H256, U256)> for StorageEntry {
fn from((key, value): (H256, U256)) -> Self {
StorageEntry { key, value }
}
}
// NOTE: Removing main_codec and manually encode subkey
// and compress second part of the value. If we have compression
// over whole value (Even SubKey) that would mess up fetching of values with seek_by_key_subkey

View File

@@ -5,21 +5,24 @@ use criterion::{
use pprof::criterion::{Output, PProfProfiler};
use reth_db::mdbx::{Env, WriteMap};
use reth_stages::{
stages::{SenderRecoveryStage, TotalDifficultyStage, TransactionLookupStage},
stages::{MerkleStage, SenderRecoveryStage, TotalDifficultyStage, TransactionLookupStage},
test_utils::TestTransaction,
ExecInput, Stage, StageId, UnwindInput,
};
use std::path::PathBuf;
mod setup;
use setup::StageRange;
criterion_group! {
name = benches;
config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None)));
targets = transaction_lookup, account_hashing, senders, total_difficulty
config = Criterion::default().with_profiler(PProfProfiler::new(1000, Output::Flamegraph(None)));
targets = transaction_lookup, account_hashing, senders, total_difficulty, merkle
}
criterion_main!(benches);
const DEFAULT_NUM_BLOCKS: u64 = 10_000;
fn account_hashing(c: &mut Criterion) {
let mut group = c.benchmark_group("Stages");
@@ -29,32 +32,42 @@ fn account_hashing(c: &mut Criterion) {
let num_blocks = 10_000;
let (path, stage, execution_range) = setup::prepare_account_hashing(num_blocks);
measure_stage_with_path(&mut group, stage, path, "AccountHashing".to_string(), execution_range);
measure_stage_with_path(
path,
&mut group,
setup::stage_unwind,
stage,
execution_range,
"AccountHashing".to_string(),
);
}
fn senders(c: &mut Criterion) {
let mut group = c.benchmark_group("Stages");
// don't need to run each stage for that many times
group.sample_size(10);
for batch in [1000usize, 10_000, 100_000, 250_000] {
let num_blocks = 10_000;
let stage = SenderRecoveryStage { commit_threshold: num_blocks, ..Default::default() };
let stage = SenderRecoveryStage { commit_threshold: DEFAULT_NUM_BLOCKS };
let label = format!("SendersRecovery-batch-{batch}");
measure_stage(&mut group, stage, num_blocks, label);
measure_stage(&mut group, setup::stage_unwind, stage, 0..DEFAULT_NUM_BLOCKS, label);
}
}
fn transaction_lookup(c: &mut Criterion) {
let mut group = c.benchmark_group("Stages");
// don't need to run each stage for that many times
group.sample_size(10);
let stage = TransactionLookupStage::new(DEFAULT_NUM_BLOCKS);
let num_blocks = 10_000;
let stage = TransactionLookupStage::new(num_blocks);
measure_stage(&mut group, stage, num_blocks, "TransactionLookup".to_string());
measure_stage(
&mut group,
setup::stage_unwind,
stage,
0..DEFAULT_NUM_BLOCKS,
"TransactionLookup".to_string(),
);
}
fn total_difficulty(c: &mut Criterion) {
@@ -63,44 +76,60 @@ fn total_difficulty(c: &mut Criterion) {
group.warm_up_time(std::time::Duration::from_millis(2000));
// don't need to run each stage for that many times
group.sample_size(10);
let num_blocks = 10_000;
let stage = TotalDifficultyStage::default();
measure_stage(&mut group, stage, num_blocks, "TotalDifficulty".to_string());
measure_stage(
&mut group,
setup::stage_unwind,
stage,
0..DEFAULT_NUM_BLOCKS,
"TotalDifficulty".to_string(),
);
}
fn measure_stage_with_path<S: Clone + Default + Stage<Env<WriteMap>>>(
group: &mut BenchmarkGroup<WallTime>,
stage: S,
fn merkle(c: &mut Criterion) {
let mut group = c.benchmark_group("Stages");
// don't need to run each stage for that many times
group.sample_size(10);
let stage = MerkleStage::Both { clean_threshold: u64::MAX };
measure_stage(
&mut group,
setup::unwind_hashes,
stage,
1..DEFAULT_NUM_BLOCKS + 1,
"Merkle-incremental".to_string(),
);
let stage = MerkleStage::Both { clean_threshold: 0 };
measure_stage(
&mut group,
setup::unwind_hashes,
stage,
1..DEFAULT_NUM_BLOCKS + 1,
"Merkle-fullhash".to_string(),
);
}
fn measure_stage_with_path<F, S>(
path: PathBuf,
group: &mut BenchmarkGroup<WallTime>,
setup: F,
stage: S,
stage_range: StageRange,
label: String,
stage_range: (ExecInput, UnwindInput),
) {
) where
S: Clone + Stage<Env<WriteMap>>,
F: Fn(S, &TestTransaction, StageRange),
{
let tx = TestTransaction::new(&path);
let (input, unwind) = stage_range;
let (input, _) = stage_range;
group.bench_function(label, move |b| {
b.to_async(FuturesExecutor).iter_with_setup(
|| {
// criterion setup does not support async, so we have to use our own runtime
tokio::runtime::Runtime::new().unwrap().block_on(async {
let mut stage = stage.clone();
let mut db_tx = tx.inner();
// Clear previous run
stage
.unwind(&mut db_tx, unwind)
.await
.map_err(|e| {
eyre::eyre!(format!(
"{e}\nMake sure your test database at `{}` isn't too old and incompatible with newer stage changes.",
path.display()
))
})
.unwrap();
db_tx.commit().unwrap();
});
setup(stage.clone(), &tx, stage_range)
},
|_| async {
let mut stage = stage.clone();
@@ -112,25 +141,34 @@ fn measure_stage_with_path<S: Clone + Default + Stage<Env<WriteMap>>>(
});
}
fn measure_stage<S: Clone + Default + Stage<Env<WriteMap>>>(
fn measure_stage<F, S>(
group: &mut BenchmarkGroup<WallTime>,
setup: F,
stage: S,
num_blocks: u64,
block_interval: std::ops::Range<u64>,
label: String,
) {
let path = setup::txs_testdata(num_blocks as usize);
) where
S: Clone + Stage<Env<WriteMap>>,
F: Fn(S, &TestTransaction, StageRange),
{
let path = setup::txs_testdata(block_interval.end);
measure_stage_with_path(
group,
stage,
path,
label,
group,
setup,
stage,
(
ExecInput {
previous_stage: Some((StageId("Another"), num_blocks)),
..Default::default()
previous_stage: Some((StageId("Another"), block_interval.end)),
stage_progress: Some(block_interval.start),
},
UnwindInput {
stage_progress: block_interval.end,
unwind_to: block_interval.start,
bad_block: None,
},
UnwindInput::default(),
),
)
label,
);
}

View File

@@ -1,4 +1,4 @@
use super::constants;
use super::{constants, StageRange};
use reth_db::{
cursor::DbCursorRO, database::Database, tables, transaction::DbTx, Error as DbError,
};
@@ -15,9 +15,7 @@ use std::path::{Path, PathBuf};
/// generate its own random data.
///
/// Returns the path to the database file, stage and range of stage execution if it exists.
pub fn prepare_account_hashing(
num_blocks: u64,
) -> (PathBuf, AccountHashingStage, (ExecInput, UnwindInput)) {
pub fn prepare_account_hashing(num_blocks: u64) -> (PathBuf, AccountHashingStage, StageRange) {
let (path, stage_range) = match std::env::var(constants::ACCOUNT_HASHING_DB) {
Ok(db) => {
let path = Path::new(&db).to_path_buf();
@@ -30,7 +28,7 @@ pub fn prepare_account_hashing(
(path, AccountHashingStage::default(), stage_range)
}
fn find_stage_range(db: &Path) -> (ExecInput, UnwindInput) {
fn find_stage_range(db: &Path) -> StageRange {
let mut stage_range = None;
TestTransaction::new(db)
.tx
@@ -54,7 +52,7 @@ fn find_stage_range(db: &Path) -> (ExecInput, UnwindInput) {
stage_range.expect("Could not find the stage range from the external DB.")
}
fn generate_testdata_db(num_blocks: u64) -> (PathBuf, (ExecInput, UnwindInput)) {
fn generate_testdata_db(num_blocks: u64) -> (PathBuf, StageRange) {
let opts = SeedOpts {
blocks: 0..num_blocks + 1,
accounts: 0..10_000,

View File

@@ -1,34 +1,165 @@
use itertools::concat;
use reth_db::{
cursor::DbCursorRO,
mdbx::{Env, WriteMap},
tables,
transaction::{DbTx, DbTxMut},
};
use reth_interfaces::test_utils::generators::random_block_range;
use reth_primitives::H256;
use reth_stages::test_utils::TestTransaction;
use std::path::{Path, PathBuf};
use reth_interfaces::test_utils::generators::{
random_block_range, random_contract_account_range, random_eoa_account_range,
random_transition_range,
};
use reth_primitives::{Account, Address, SealedBlock, H256};
use reth_stages::{
stages::{AccountHashingStage, StorageHashingStage},
test_utils::TestTransaction,
DBTrieLoader, ExecInput, Stage, UnwindInput,
};
use std::{
collections::BTreeMap,
path::{Path, PathBuf},
};
mod constants;
mod account_hashing;
pub use account_hashing::*;
// Helper for generating testdata for the sender recovery stage and tx lookup stages (512MB).
// Returns the path to the database file and the number of blocks written.
pub fn txs_testdata(num_blocks: usize) -> PathBuf {
pub(crate) type StageRange = (ExecInput, UnwindInput);
pub(crate) fn stage_unwind<S: Clone + Stage<Env<WriteMap>>>(
stage: S,
tx: &TestTransaction,
range: StageRange,
) {
let (_, unwind) = range;
tokio::runtime::Runtime::new().unwrap().block_on(async {
let mut stage = stage.clone();
let mut db_tx = tx.inner();
// Clear previous run
stage
.unwind(&mut db_tx, unwind)
.await
.map_err(|e| {
eyre::eyre!(format!(
"{e}\nMake sure your test database at `{}` isn't too old and incompatible with newer stage changes.",
tx.path.as_ref().unwrap().display()
))
})
.unwrap();
db_tx.commit().unwrap();
});
}
pub(crate) fn unwind_hashes<S: Clone + Stage<Env<WriteMap>>>(
stage: S,
tx: &TestTransaction,
range: StageRange,
) {
let (input, unwind) = range;
tokio::runtime::Runtime::new().unwrap().block_on(async {
let mut stage = stage.clone();
let mut db_tx = tx.inner();
StorageHashingStage::default().unwind(&mut db_tx, unwind).await.unwrap();
AccountHashingStage::default().unwind(&mut db_tx, unwind).await.unwrap();
let target_root = db_tx.get_header(unwind.unwind_to).unwrap().state_root;
let _ = db_tx.delete::<tables::AccountsTrie>(target_root, None);
// Clear previous run
stage.unwind(&mut db_tx, unwind).await.unwrap();
AccountHashingStage::default().execute(&mut db_tx, input).await.unwrap();
StorageHashingStage::default().execute(&mut db_tx, input).await.unwrap();
db_tx.commit().unwrap();
});
}
// Helper for generating testdata for the benchmarks.
// Returns the path to the database file.
pub(crate) fn txs_testdata(num_blocks: u64) -> PathBuf {
let path = Path::new(env!("CARGO_MANIFEST_DIR")).join("testdata").join("txs-bench");
let txs_range = 100..150;
// number of storage changes per transition
let n_changes = 0..3;
// range of possible values for a storage key
let key_range = 0..300;
// number of accounts
let n_eoa = 131;
let n_contract = 31;
if !path.exists() {
// create the dirs
std::fs::create_dir_all(&path).unwrap();
println!("Transactions testdata not found, generating to {:?}", path.display());
let tx = TestTransaction::new(&path);
// This takes a while because it does sig recovery internally
let blocks = random_block_range(0..num_blocks as u64 + 1, H256::zero(), txs_range);
let accounts: BTreeMap<Address, Account> = concat([
random_eoa_account_range(0..n_eoa),
random_contract_account_range(&mut (0..n_contract)),
])
.into_iter()
.collect();
let mut blocks = random_block_range(0..num_blocks + 1, H256::zero(), txs_range);
let (transitions, start_state) = random_transition_range(
blocks.iter().take(2),
accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
n_changes.clone(),
key_range.clone(),
);
tx.insert_accounts_and_storages(start_state.clone()).unwrap();
// make first block after genesis have valid state root
let root = DBTrieLoader::default().calculate_root(&tx.inner()).unwrap();
let second_block = blocks.get_mut(1).unwrap();
let cloned_second = second_block.clone();
let mut updated_header = cloned_second.header.unseal();
updated_header.state_root = root;
*second_block = SealedBlock { header: updated_header.seal_slow(), ..cloned_second };
let offset = transitions.len() as u64;
tx.insert_transitions(transitions, None).unwrap();
let (transitions, final_state) =
random_transition_range(blocks.iter().skip(2), start_state, n_changes, key_range);
tx.insert_transitions(transitions, Some(offset)).unwrap();
tx.insert_accounts_and_storages(final_state).unwrap();
// make last block have valid state root
let root = {
let mut tx_mut = tx.inner();
let root = DBTrieLoader::default().calculate_root(&tx_mut).unwrap();
tx_mut.commit().unwrap();
root
};
tx.query(|tx| {
assert!(tx.get::<tables::AccountsTrie>(root)?.is_some());
Ok(())
})
.unwrap();
let last_block = blocks.last_mut().unwrap();
let cloned_last = last_block.clone();
let mut updated_header = cloned_last.header.unseal();
updated_header.state_root = root;
*last_block = SealedBlock { header: updated_header.seal_slow(), ..cloned_last };
// insert all blocks
tx.insert_blocks(blocks.iter(), None).unwrap();
// initialize TD

View File

@@ -77,6 +77,7 @@ pub use error::*;
pub use id::*;
pub use pipeline::*;
pub use stage::*;
pub use trie::DBTrieLoader;
// NOTE: Needed so the link in the module-level rustdoc works.
#[allow(unused_extern_crates)]

View File

@@ -160,33 +160,40 @@ impl<DB: Database> Stage<DB> for StorageHashingStage {
// Assumption we are okay with is that plain state represent
// `previous_stage_progress` state.
.map(|(address, storage)| {
storage
.into_iter()
.map(|key| {
plain_storage
.seek_by_key_subkey(address, key)
.map(|ret| (keccak256(key), ret.map(|e| e.value)))
})
.collect::<Result<BTreeMap<_, _>, _>>()
.map(|storage| (keccak256(address), storage))
let res = (
keccak256(address),
storage
.into_iter()
.map(|key| {
Ok::<Option<_>, reth_db::Error>(
plain_storage
.seek_by_key_subkey(address, key)?
.filter(|v| v.key == key)
.map(|ret| (keccak256(key), ret.value)),
)
})
.collect::<Result<Vec<Option<_>>, _>>()?
.into_iter()
.flatten()
.collect::<BTreeMap<_, _>>(),
);
Ok::<_, reth_db::Error>(res)
})
.collect::<Result<BTreeMap<_, _>, _>>()?
.into_iter()
// Hash the address and key and apply them to HashedStorage (if Storage is None
// just remove it);
.try_for_each(|(address, storage)| {
.try_for_each(|(hashed_address, storage)| {
storage.into_iter().try_for_each(|(key, val)| -> Result<(), StageError> {
if hashed_storage
.seek_by_key_subkey(address, key)?
.seek_by_key_subkey(hashed_address, key)?
.filter(|entry| entry.key == key)
.is_some()
{
hashed_storage.delete_current()?;
}
if let Some(value) = val {
hashed_storage.upsert(address, StorageEntry { key, value })?;
}
hashed_storage.upsert(hashed_address, StorageEntry { key, value: val })?;
Ok(())
})
})?;
@@ -232,9 +239,9 @@ impl<DB: Database> Stage<DB> for StorageHashingStage {
.collect::<BTreeMap<_, _>>()
.into_iter()
// Apply values to HashedStorage (if Value is zero just remove it);
.try_for_each(|((address, key), value)| -> Result<(), StageError> {
.try_for_each(|((hashed_address, key), value)| -> Result<(), StageError> {
if hashed_storage
.seek_by_key_subkey(address, key)?
.seek_by_key_subkey(hashed_address, key)?
.filter(|entry| entry.key == key)
.is_some()
{
@@ -242,7 +249,7 @@ impl<DB: Database> Stage<DB> for StorageHashingStage {
}
if value != U256::ZERO {
hashed_storage.upsert(address, StorageEntry { key, value })?;
hashed_storage.upsert(hashed_address, StorageEntry { key, value })?;
}
Ok(())
})?;

View File

@@ -14,6 +14,9 @@ pub const MERKLE_EXECUTION: StageId = StageId("MerkleExecute");
/// The [`StageId`] of the merkle hashing unwind stage.
pub const MERKLE_UNWIND: StageId = StageId("MerkleUnwind");
/// The [`StageId`] of the merkle hashing unwind and execution stage.
pub const MERKLE_BOTH: StageId = StageId("MerkleBoth");
/// The merkle hashing stage uses input from
/// [`AccountHashingStage`][crate::stages::AccountHashingStage] and
/// [`StorageHashingStage`][crate::stages::AccountHashingStage] to calculate intermediate hashes
@@ -35,7 +38,7 @@ pub const MERKLE_UNWIND: StageId = StageId("MerkleUnwind");
/// - [`AccountHashingStage`][crate::stages::AccountHashingStage]
/// - [`StorageHashingStage`][crate::stages::StorageHashingStage]
/// - [`MerkleStage::Execution`]
#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum MerkleStage {
/// The execution portion of the merkle stage.
Execution {
@@ -46,7 +49,9 @@ pub enum MerkleStage {
/// The unwind portion of the merkle stage.
Unwind,
#[cfg(test)]
/// Able to execute and unwind. Used for tests
#[cfg(any(test, feature = "test-utils"))]
#[allow(missing_docs)]
Both { clean_threshold: u64 },
}
@@ -69,8 +74,8 @@ impl<DB: Database> Stage<DB> for MerkleStage {
match self {
MerkleStage::Execution { .. } => MERKLE_EXECUTION,
MerkleStage::Unwind => MERKLE_UNWIND,
#[cfg(test)]
MerkleStage::Both { .. } => unreachable!(),
#[cfg(any(test, feature = "test-utils"))]
MerkleStage::Both { .. } => MERKLE_BOTH,
}
}
@@ -89,7 +94,7 @@ impl<DB: Database> Stage<DB> for MerkleStage {
})
}
MerkleStage::Execution { clean_threshold } => *clean_threshold,
#[cfg(test)]
#[cfg(any(test, feature = "test-utils"))]
MerkleStage::Both { clean_threshold } => *clean_threshold,
};
@@ -156,10 +161,22 @@ impl<DB: Database> Stage<DB> for MerkleStage {
let from_transition = tx.get_block_transition(input.unwind_to)?;
let to_transition = tx.get_block_transition(input.stage_progress)?;
loader
let block_root = loader
.update_root(tx, current_root, from_transition..to_transition)
.map_err(|e| StageError::Fatal(Box::new(e)))?;
if block_root != target_root {
let unwind_to = input.unwind_to;
warn!(target: "sync::stages::merkle::unwind", ?unwind_to, got = ?block_root, expected = ?target_root, "Block's root state failed verification");
return Err(StageError::Validation {
block: unwind_to,
error: consensus::Error::BodyStateRootDiff {
got: block_root,
expected: target_root,
},
})
}
info!(target: "sync::stages::merkle::unwind", "Stage finished");
Ok(UnwindOutput { stage_progress: input.unwind_to })
}
@@ -175,12 +192,11 @@ mod tests {
use assert_matches::assert_matches;
use reth_db::{
cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
models::{AccountBeforeTx, StoredBlockBody},
tables,
transaction::{DbTx, DbTxMut},
};
use reth_interfaces::test_utils::generators::{
random_block, random_block_range, random_contract_account_range,
random_block, random_block_range, random_contract_account_range, random_transition_range,
};
use reth_primitives::{keccak256, Account, Address, SealedBlock, StorageEntry, H256, U256};
use std::collections::BTreeMap;
@@ -276,12 +292,16 @@ mod tests {
let end = input.previous_stage_progress() + 1;
let n_accounts = 31;
let mut accounts = random_contract_account_range(&mut (0..n_accounts));
let accounts = random_contract_account_range(&mut (0..n_accounts))
.into_iter()
.collect::<BTreeMap<_, _>>();
let SealedBlock { header, body, ommers, withdrawals } =
random_block(stage_progress, None, Some(0), None);
let mut header = header.unseal();
header.state_root = self.generate_initial_trie(&accounts)?;
header.state_root =
self.generate_initial_trie(accounts.iter().map(|(k, v)| (*k, *v)))?;
let sealed_head = SealedBlock { header: header.seal_slow(), body, ommers, withdrawals };
let head_hash = sealed_head.hash();
@@ -289,64 +309,18 @@ mod tests {
blocks.extend(random_block_range((stage_progress + 1)..end, head_hash, 0..3));
self.tx.insert_headers(blocks.iter().map(|block| &block.header))?;
self.tx.insert_blocks(blocks.iter(), None)?;
let (mut transition_id, mut tx_id) = (0, 0);
let (transitions, final_state) = random_transition_range(
blocks.iter(),
accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
0..3,
0..256,
);
let mut storages: BTreeMap<Address, BTreeMap<H256, U256>> = BTreeMap::new();
self.tx.insert_transitions(transitions, None)?;
for progress in blocks.iter() {
// Insert last progress data
self.tx.commit(|tx| {
let body = StoredBlockBody {
start_tx_id: tx_id,
tx_count: progress.body.len() as u64,
};
progress.body.iter().try_for_each(|transaction| {
tx.put::<tables::TxHashNumber>(transaction.hash(), tx_id)?;
tx.put::<tables::Transactions>(tx_id, transaction.clone())?;
tx.put::<tables::TxTransitionIndex>(tx_id, transition_id)?;
// seed account changeset
let (addr, prev_acc) = accounts
.get_mut(rand::random::<usize>() % n_accounts as usize)
.unwrap();
let acc_before_tx =
AccountBeforeTx { address: *addr, info: Some(*prev_acc) };
tx.put::<tables::AccountChangeSet>(transition_id, acc_before_tx)?;
prev_acc.nonce += 1;
prev_acc.balance = prev_acc.balance.wrapping_add(U256::from(1));
let new_entry = StorageEntry {
key: keccak256([rand::random::<u8>()]),
value: U256::from(rand::random::<u8>() % 30 + 1),
};
let storage = storages.entry(*addr).or_default();
let old_value = storage.entry(new_entry.key).or_default();
tx.put::<tables::StorageChangeSet>(
(transition_id, *addr).into(),
StorageEntry { key: new_entry.key, value: *old_value },
)?;
*old_value = new_entry.value;
tx_id += 1;
transition_id += 1;
Ok(())
})?;
tx.put::<tables::BlockTransitionIndex>(progress.number, transition_id)?;
tx.put::<tables::BlockBodies>(progress.number, body)
})?;
}
self.insert_accounts(&accounts)?;
self.insert_storages(&storages)?;
self.tx.insert_accounts_and_storages(final_state)?;
let last_block_number = end - 1;
let root = self.state_root()?;
@@ -471,9 +445,11 @@ mod tests {
pub(crate) fn generate_initial_trie(
&self,
accounts: &[(Address, Account)],
accounts: impl IntoIterator<Item = (Address, Account)>,
) -> Result<H256, TestRunnerError> {
self.insert_accounts(accounts)?;
self.tx.insert_accounts_and_storages(
accounts.into_iter().map(|(addr, acc)| (addr, (acc, std::iter::empty()))),
)?;
let loader = DBTrieLoader::default();
@@ -485,57 +461,6 @@ mod tests {
Ok(root)
}
pub(crate) fn insert_accounts(
&self,
accounts: &[(Address, Account)],
) -> Result<(), TestRunnerError> {
for (addr, acc) in accounts.iter() {
self.tx.commit(|tx| {
tx.put::<tables::PlainAccountState>(*addr, *acc)?;
tx.put::<tables::HashedAccount>(keccak256(addr), *acc)?;
Ok(())
})?;
}
Ok(())
}
fn insert_storages(
&self,
storages: &BTreeMap<Address, BTreeMap<H256, U256>>,
) -> Result<(), TestRunnerError> {
self.tx
.commit(|tx| {
storages.iter().try_for_each(|(&addr, storage)| {
storage.iter().try_for_each(|(&key, &value)| {
let entry = StorageEntry { key, value };
tx.put::<tables::PlainStorageState>(addr, entry)
})
})?;
storages
.iter()
.map(|(addr, storage)| {
(
keccak256(addr),
storage
.iter()
.filter(|(_, &value)| value != U256::ZERO)
.map(|(key, value)| (keccak256(key), value)),
)
})
.collect::<BTreeMap<_, _>>()
.into_iter()
.try_for_each(|(addr, storage)| {
storage.into_iter().try_for_each(|(key, &value)| {
let entry = StorageEntry { key, value };
tx.put::<tables::HashedStorage>(addr, entry)
})
})?;
Ok(())
})
.map_err(|e| e.into())
}
fn check_root(&self, previous_stage_progress: u64) -> Result<(), TestRunnerError> {
if previous_stage_progress != 0 {
let block_root =

View File

@@ -1,19 +1,26 @@
use reth_db::{
cursor::{DbCursorRO, DbCursorRW},
cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO},
mdbx::{
test_utils::{create_test_db, create_test_db_with_path},
tx::Tx,
Env, EnvKind, WriteMap, RW,
},
models::StoredBlockBody,
models::{AccountBeforeTx, BlockNumHash, StoredBlockBody},
table::Table,
tables,
transaction::{DbTx, DbTxMut},
Error as DbError,
};
use reth_primitives::{BlockNumber, SealedBlock, SealedHeader, U256};
use reth_primitives::{
keccak256, Account, Address, BlockNumber, SealedBlock, SealedHeader, StorageEntry, H256, U256,
};
use reth_provider::Transaction;
use std::{borrow::Borrow, path::Path, sync::Arc};
use std::{
borrow::Borrow,
collections::BTreeMap,
path::{Path, PathBuf},
sync::Arc,
};
/// The [TestTransaction] is used as an internal
/// database for testing stage implementation.
@@ -26,18 +33,22 @@ use std::{borrow::Borrow, path::Path, sync::Arc};
pub struct TestTransaction {
/// WriteMap DB
pub tx: Arc<Env<WriteMap>>,
pub path: Option<PathBuf>,
}
impl Default for TestTransaction {
/// Create a new instance of [TestTransaction]
fn default() -> Self {
Self { tx: create_test_db::<WriteMap>(EnvKind::RW) }
Self { tx: create_test_db::<WriteMap>(EnvKind::RW), path: None }
}
}
impl TestTransaction {
pub fn new(path: &Path) -> Self {
Self { tx: Arc::new(create_test_db_with_path::<WriteMap>(EnvKind::RW, path)) }
Self {
tx: Arc::new(create_test_db_with_path::<WriteMap>(EnvKind::RW, path)),
path: Some(path.to_path_buf()),
}
}
/// Return a database wrapped in [Transaction].
@@ -177,23 +188,20 @@ impl TestTransaction {
})
}
/// Inserts a single [SealedHeader] into the corresponding tables of the headers stage.
fn insert_header(tx: &mut Tx<'_, RW, WriteMap>, header: &SealedHeader) -> Result<(), DbError> {
tx.put::<tables::CanonicalHeaders>(header.number, header.hash())?;
tx.put::<tables::HeaderNumbers>(header.hash(), header.number)?;
tx.put::<tables::Headers>(header.number, header.clone().unseal())
}
/// Insert ordered collection of [SealedHeader] into the corresponding tables
/// that are supposed to be populated by the headers stage.
pub fn insert_headers<'a, I>(&self, headers: I) -> Result<(), DbError>
where
I: Iterator<Item = &'a SealedHeader>,
{
self.commit(|tx| {
let headers = headers.collect::<Vec<_>>();
for header in headers {
tx.put::<tables::CanonicalHeaders>(header.number, header.hash())?;
tx.put::<tables::HeaderNumbers>(header.hash(), header.number)?;
tx.put::<tables::Headers>(header.number, header.clone().unseal())?;
}
Ok(())
})
self.commit(|tx| headers.into_iter().try_for_each(|header| Self::insert_header(tx, header)))
}
/// Inserts total difficulty of headers into the corresponding tables.
@@ -204,23 +212,19 @@ impl TestTransaction {
I: Iterator<Item = &'a SealedHeader>,
{
self.commit(|tx| {
let headers = headers.collect::<Vec<_>>();
let mut td = U256::ZERO;
for header in headers {
headers.into_iter().try_for_each(|header| {
Self::insert_header(tx, header)?;
td += header.difficulty;
tx.put::<tables::HeaderTD>(header.number, td.into())?;
tx.put::<tables::CanonicalHeaders>(header.number, header.hash())?;
tx.put::<tables::HeaderNumbers>(header.hash(), header.number)?;
tx.put::<tables::Headers>(header.number, header.clone().unseal())?;
}
Ok(())
tx.put::<tables::HeaderTD>(header.number, td.into())
})
})
}
/// Insert ordered collection of [SealedBlock] into corresponding tables.
/// Superset functionality of [TestTransaction::insert_headers].
///
/// Assumes that there's a single transition for each transaction (i.e. no block rewards).
pub fn insert_blocks<'a, I>(&self, blocks: I, tx_offset: Option<u64>) -> Result<(), DbError>
where
I: Iterator<Item = &'a SealedBlock>,
@@ -228,12 +232,8 @@ impl TestTransaction {
self.commit(|tx| {
let mut current_tx_id = tx_offset.unwrap_or_default();
for block in blocks {
// Insert into header tables.
tx.put::<tables::CanonicalHeaders>(block.number, block.hash())?;
tx.put::<tables::HeaderNumbers>(block.hash(), block.number)?;
tx.put::<tables::Headers>(block.number, block.header.clone().unseal())?;
blocks.into_iter().try_for_each(|block| {
Self::insert_header(tx, &block.header)?;
// Insert into body tables.
tx.put::<tables::BlockBodies>(
block.number,
@@ -242,13 +242,88 @@ impl TestTransaction {
tx_count: block.body.len() as u64,
},
)?;
for body_tx in block.body.clone() {
tx.put::<tables::Transactions>(current_tx_id, body_tx)?;
block.body.iter().try_for_each(|body_tx| {
tx.put::<tables::TxTransitionIndex>(current_tx_id, current_tx_id)?;
tx.put::<tables::Transactions>(current_tx_id, body_tx.clone())?;
current_tx_id += 1;
}
}
Ok(())
})?;
tx.put::<tables::BlockTransitionIndex>(block.number, current_tx_id)
})
})
}
Ok(())
/// Insert collection of ([Address], [Account]) into corresponding tables.
pub fn insert_accounts_and_storages<I, S>(&self, accounts: I) -> Result<(), DbError>
where
I: IntoIterator<Item = (Address, (Account, S))>,
S: IntoIterator<Item = StorageEntry>,
{
self.commit(|tx| {
accounts.into_iter().try_for_each(|(address, (account, storage))| {
let hashed_address = keccak256(address);
// Insert into account tables.
tx.put::<tables::PlainAccountState>(address, account)?;
tx.put::<tables::HashedAccount>(hashed_address, account)?;
// Insert into storage tables.
storage.into_iter().filter(|e| e.value != U256::ZERO).try_for_each(|entry| {
let hashed_entry = StorageEntry { key: keccak256(entry.key), ..entry };
let mut cursor = tx.cursor_dup_write::<tables::PlainStorageState>()?;
if let Some(e) = cursor
.seek_by_key_subkey(address, entry.key)?
.filter(|e| e.key == entry.key)
{
cursor.delete_current()?;
}
cursor.upsert(address, entry)?;
let mut cursor = tx.cursor_dup_write::<tables::HashedStorage>()?;
if let Some(e) = cursor
.seek_by_key_subkey(hashed_address, hashed_entry.key)?
.filter(|e| e.key == hashed_entry.key)
{
cursor.delete_current()?;
}
cursor.upsert(hashed_address, hashed_entry)?;
Ok(())
})
})
})
}
/// Insert collection of Vec<([Address], [Account], Vec<[StorageEntry]>)> into
/// corresponding tables.
pub fn insert_transitions<I>(
&self,
transitions: I,
transition_offset: Option<u64>,
) -> Result<(), DbError>
where
I: IntoIterator<Item = Vec<(Address, Account, Vec<StorageEntry>)>>,
{
let offset = transition_offset.unwrap_or_default();
self.commit(|tx| {
transitions.into_iter().enumerate().try_for_each(|(transition_id, changes)| {
changes.into_iter().try_for_each(|(address, old_account, old_storage)| {
let tid = offset + transition_id as u64;
// Insert into account changeset.
tx.put::<tables::AccountChangeSet>(
tid,
AccountBeforeTx { address, info: Some(old_account) },
)?;
let tid_address = (tid, address).into();
// Insert into storage changeset.
old_storage.into_iter().try_for_each(|entry| {
tx.put::<tables::StorageChangeSet>(tid_address, entry)
})
})
})
})
}
}

View File

@@ -24,7 +24,7 @@ use std::{
use tracing::*;
#[derive(Debug, thiserror::Error)]
pub(crate) enum TrieError {
pub enum TrieError {
#[error("Some error occurred: {0}")]
InternalError(#[from] cita_trie::TrieError),
#[error("The root node wasn't found in the DB")]
@@ -54,17 +54,33 @@ where
Ok(<Self as cita_trie::DB>::get(self, key)?.is_some())
}
fn insert(&self, key: Vec<u8>, value: Vec<u8>) -> Result<(), Self::Error> {
// Caching and bulk inserting shouldn't be needed, as the data is ordered
self.tx.put::<tables::AccountsTrie>(H256::from_slice(key.as_slice()), value)?;
fn insert(&self, _key: Vec<u8>, _value: Vec<u8>) -> Result<(), Self::Error> {
unreachable!("Use batch instead.");
}
// Insert a batch of data into the cache.
fn insert_batch(&self, keys: Vec<Vec<u8>>, values: Vec<Vec<u8>>) -> Result<(), Self::Error> {
let mut cursor = self.tx.cursor_write::<tables::AccountsTrie>()?;
for (key, value) in keys.into_iter().zip(values.into_iter()) {
cursor.upsert(H256::from_slice(key.as_slice()), value)?;
}
Ok(())
}
fn remove(&self, key: &[u8]) -> Result<(), Self::Error> {
self.tx.delete::<tables::AccountsTrie>(H256::from_slice(key), None)?;
fn remove_batch(&self, keys: &[Vec<u8>]) -> Result<(), Self::Error> {
let mut cursor = self.tx.cursor_write::<tables::AccountsTrie>()?;
for key in keys {
if cursor.seek_exact(H256::from_slice(key.as_slice()))?.is_some() {
cursor.delete_current()?;
}
}
Ok(())
}
fn remove(&self, _key: &[u8]) -> Result<(), Self::Error> {
unreachable!("Use batch instead.");
}
fn flush(&self) -> Result<(), Self::Error> {
Ok(())
}
@@ -104,31 +120,49 @@ where
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
let mut cursor = self.tx.cursor_dup_read::<tables::StoragesTrie>()?;
Ok(cursor.seek_by_key_subkey(self.key, H256::from_slice(key))?.map(|entry| entry.node))
let subkey = H256::from_slice(key);
Ok(cursor
.seek_by_key_subkey(self.key, subkey)?
.filter(|entry| entry.hash == subkey)
.map(|entry| entry.node))
}
fn contains(&self, key: &[u8]) -> Result<bool, Self::Error> {
Ok(<Self as cita_trie::DB>::get(self, key)?.is_some())
}
fn insert(&self, key: Vec<u8>, value: Vec<u8>) -> Result<(), Self::Error> {
// Caching and bulk inserting shouldn't be needed, as the data is ordered
self.tx.put::<tables::StoragesTrie>(
self.key,
StorageTrieEntry { hash: H256::from_slice(key.as_slice()), node: value },
)?;
fn insert(&self, _key: Vec<u8>, _value: Vec<u8>) -> Result<(), Self::Error> {
unreachable!("Use batch instead.");
}
/// Insert a batch of data into the cache.
fn insert_batch(&self, keys: Vec<Vec<u8>>, values: Vec<Vec<u8>>) -> Result<(), Self::Error> {
let mut cursor = self.tx.cursor_dup_write::<tables::StoragesTrie>()?;
for (key, node) in keys.into_iter().zip(values.into_iter()) {
let hash = H256::from_slice(key.as_slice());
if cursor.seek_by_key_subkey(self.key, hash)?.filter(|e| e.hash == hash).is_some() {
cursor.delete_current()?;
}
cursor.upsert(self.key, StorageTrieEntry { hash, node })?;
}
Ok(())
}
fn remove(&self, key: &[u8]) -> Result<(), Self::Error> {
fn remove_batch(&self, keys: &[Vec<u8>]) -> Result<(), Self::Error> {
let mut cursor = self.tx.cursor_dup_write::<tables::StoragesTrie>()?;
cursor
.seek_by_key_subkey(self.key, H256::from_slice(key))?
.map(|_| cursor.delete_current())
.transpose()?;
for key in keys {
let hash = H256::from_slice(key.as_slice());
if cursor.seek_by_key_subkey(self.key, hash)?.filter(|e| e.hash == hash).is_some() {
cursor.delete_current()?;
}
}
Ok(())
}
fn remove(&self, _key: &[u8]) -> Result<(), Self::Error> {
unreachable!("Use batch instead.");
}
fn flush(&self) -> Result<(), Self::Error> {
Ok(())
}
@@ -139,7 +173,7 @@ impl<'tx, 'itx, DB: Database> DupHashDatabase<'tx, 'itx, DB> {
fn new(tx: &'tx Transaction<'itx, DB>, key: H256) -> Result<Self, TrieError> {
let root = EMPTY_ROOT;
let mut cursor = tx.cursor_dup_write::<tables::StoragesTrie>()?;
if cursor.seek_by_key_subkey(key, root)?.is_none() {
if cursor.seek_by_key_subkey(key, root)?.filter(|entry| entry.hash == root).is_none() {
tx.put::<tables::StoragesTrie>(
key,
StorageTrieEntry { hash: root, node: [EMPTY_STRING_CODE].to_vec() },
@@ -155,6 +189,7 @@ impl<'tx, 'itx, DB: Database> DupHashDatabase<'tx, 'itx, DB> {
}
tx.cursor_dup_read::<tables::StoragesTrie>()?
.seek_by_key_subkey(key, root)?
.filter(|entry| entry.hash == root)
.ok_or(TrieError::MissingRoot(root))?;
Ok(Self { tx, key })
}
@@ -190,12 +225,14 @@ impl EthAccount {
}
}
/// Struct for calculating the root of a merkle patricia tree,
/// while populating the database with intermediate hashes.
#[derive(Debug, Default)]
pub(crate) struct DBTrieLoader;
pub struct DBTrieLoader;
impl DBTrieLoader {
/// Calculates the root of the state trie, saving intermediate hashes in the database.
pub(crate) fn calculate_root<DB: Database>(
pub fn calculate_root<DB: Database>(
&self,
tx: &Transaction<'_, DB>,
) -> Result<H256, TrieError> {
@@ -255,7 +292,7 @@ impl DBTrieLoader {
}
/// Calculates the root of the state trie by updating an existing trie.
pub(crate) fn update_root<DB: Database>(
pub fn update_root<DB: Database>(
&self,
tx: &Transaction<'_, DB>,
root: H256,
@@ -272,20 +309,21 @@ impl DBTrieLoader {
let mut trie = PatriciaTrie::from(Arc::clone(&db), Arc::clone(&hasher), root.as_bytes())?;
for (address, changed_storages) in changed_accounts {
if let Some(account) = trie.get(address.as_slice())? {
let storage_root = EthAccount::decode(&mut account.as_slice())?.storage_root;
let storage_root = if let Some(account) = trie.get(address.as_slice())? {
trie.remove(address.as_bytes())?;
if let Some((_, account)) = accounts_cursor.seek_exact(address)? {
let value = EthAccount::from_with_root(
account,
self.update_storage_root(tx, storage_root, address, changed_storages)?,
);
let storage_root = EthAccount::decode(&mut account.as_slice())?.storage_root;
self.update_storage_root(tx, storage_root, address, changed_storages)?
} else {
self.calculate_storage_root(tx, address)?
};
let mut out = Vec::new();
Encodable::encode(&value, &mut out);
trie.insert(address.as_bytes().to_vec(), out)?;
}
if let Some((_, account)) = accounts_cursor.seek_exact(address)? {
let value = EthAccount::from_with_root(account, storage_root);
let mut out = Vec::new();
Encodable::encode(&value, &mut out);
trie.insert(address.as_bytes().to_vec(), out)?;
}
}
@@ -310,7 +348,7 @@ impl DBTrieLoader {
for key in changed_storages {
if let Some(StorageEntry { value, .. }) =
storage_cursor.seek_by_key_subkey(address, key)?
storage_cursor.seek_by_key_subkey(address, key)?.filter(|e| e.key == key)
{
let out = encode_fixed_size(&value).to_vec();
trie.insert(key.as_bytes().to_vec(), out)?;

View File

@@ -1,3 +1,4 @@
#[allow(unused_imports)]
use reth_db::{
database::Database,
mdbx::{test_utils::create_test_db_with_path, EnvKind, WriteMap},
@@ -7,12 +8,15 @@ use reth_db::{
use std::path::Path;
/// Path where the DB is initialized for benchmarks.
#[allow(unused)]
const BENCH_DB_PATH: &str = "/tmp/reth-benches";
/// Used for RandomRead and RandomWrite benchmarks.
#[allow(unused)]
const RANDOM_INDEXES: [usize; 10] = [23, 2, 42, 5, 3, 99, 54, 0, 33, 64];
/// Returns bench vectors in the format: `Vec<(Key, EncodedKey, Value, CompressedValue)>`.
#[allow(unused)]
fn load_vectors<T: reth_db::table::Table>() -> Vec<(T::Key, bytes::Bytes, T::Value, bytes::Bytes)>
where
T: Default,