mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-29 09:08:05 -05:00
feat: refactor few stages to providers, introduce insert_block (#1474)
Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>
This commit is contained in:
12
Cargo.lock
generated
12
Cargo.lock
generated
@@ -4858,6 +4858,7 @@ dependencies = [
|
||||
"crunchy",
|
||||
"derive_more",
|
||||
"ethers-core",
|
||||
"eyre",
|
||||
"fixed-hash",
|
||||
"hash-db",
|
||||
"hex",
|
||||
@@ -4879,6 +4880,7 @@ dependencies = [
|
||||
"serde",
|
||||
"serde_json",
|
||||
"serde_with",
|
||||
"shellexpand",
|
||||
"strum",
|
||||
"sucds",
|
||||
"test-fuzz",
|
||||
@@ -4892,14 +4894,22 @@ dependencies = [
|
||||
name = "reth-provider"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"assert_matches",
|
||||
"auto_impl",
|
||||
"cita_trie",
|
||||
"hasher",
|
||||
"itertools 0.10.5",
|
||||
"parking_lot 0.12.1",
|
||||
"proptest",
|
||||
"reth-db",
|
||||
"reth-interfaces",
|
||||
"reth-primitives",
|
||||
"reth-revm-primitives",
|
||||
"reth-rlp",
|
||||
"reth-tracing",
|
||||
"revm-primitives",
|
||||
"thiserror",
|
||||
"triehash",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -5116,11 +5126,9 @@ dependencies = [
|
||||
"arbitrary",
|
||||
"assert_matches",
|
||||
"async-trait",
|
||||
"cita_trie",
|
||||
"criterion",
|
||||
"eyre",
|
||||
"futures-util",
|
||||
"hasher",
|
||||
"itertools 0.10.5",
|
||||
"metrics",
|
||||
"num-traits",
|
||||
|
||||
@@ -9,7 +9,7 @@
|
||||
|
||||
pub mod eth_dao_fork;
|
||||
|
||||
/// Execution result types
|
||||
pub mod execution_result;
|
||||
/// Execution result types.
|
||||
pub use reth_provider::execution_result;
|
||||
/// Executor
|
||||
pub mod executor;
|
||||
|
||||
@@ -9,15 +9,11 @@ description = "Commonly used types in reth."
|
||||
|
||||
[dependencies]
|
||||
# reth
|
||||
reth-rlp = { path = "../rlp", features = [
|
||||
"std",
|
||||
"derive",
|
||||
"ethereum-types",
|
||||
] }
|
||||
reth-rlp = { path = "../rlp", features = ["std", "derive", "ethereum-types"] }
|
||||
reth-rlp-derive = { path = "../rlp/rlp-derive" }
|
||||
reth-codecs = { version = "0.1.0", path = "../storage/codecs" }
|
||||
|
||||
revm-primitives = { version="1.0.0", features = ["serde"] }
|
||||
revm-primitives = { version = "1.0.0", features = ["serde"] }
|
||||
|
||||
# ethereum
|
||||
ethers-core = { git = "https://github.com/gakonst/ethers-rs", default-features = false }
|
||||
@@ -61,6 +57,10 @@ triehash = "0.8"
|
||||
plain_hasher = "0.2"
|
||||
hash-db = "0.15"
|
||||
|
||||
# used for clap value parser
|
||||
eyre = "0.6"
|
||||
shellexpand = "3.0"
|
||||
|
||||
# arbitrary utils
|
||||
arbitrary = { version = "1.1.7", features = ["derive"], optional = true }
|
||||
proptest = { version = "1.0", optional = true }
|
||||
@@ -81,7 +81,11 @@ proptest-derive = "0.3"
|
||||
# https://github.com/paradigmxyz/reth/pull/177#discussion_r1021172198
|
||||
secp256k1 = "0.24.2"
|
||||
criterion = "0.4.0"
|
||||
pprof = { version = "0.11", features = ["flamegraph", "frame-pointer", "criterion"] }
|
||||
pprof = { version = "0.11", features = [
|
||||
"flamegraph",
|
||||
"frame-pointer",
|
||||
"criterion",
|
||||
] }
|
||||
|
||||
[features]
|
||||
default = []
|
||||
|
||||
@@ -585,10 +585,9 @@ impl ForkCondition {
|
||||
mod tests {
|
||||
use crate::{
|
||||
AllGenesisFormats, Chain, ChainSpec, ChainSpecBuilder, ForkCondition, ForkHash, ForkId,
|
||||
Genesis, Hardfork, Head, GOERLI, H256, MAINNET, SEPOLIA,
|
||||
Genesis, Hardfork, Head, GOERLI, H256, MAINNET, SEPOLIA, U256,
|
||||
};
|
||||
use ethers_core::types as EtherType;
|
||||
use revm_primitives::U256;
|
||||
fn test_fork_ids(spec: &ChainSpec, cases: &[(Head, ForkId)]) {
|
||||
for (block, expected_id) in cases {
|
||||
let computed_id = spec.fork_id(block);
|
||||
|
||||
@@ -43,10 +43,6 @@ itertools = "0.10.5"
|
||||
rayon = "1.6.0"
|
||||
num-traits = "0.2.15"
|
||||
|
||||
# trie
|
||||
cita_trie = "4.0.0"
|
||||
hasher = "0.1.4"
|
||||
|
||||
# arbitrary utils
|
||||
arbitrary = { version = "1.1.7", features = ["derive"], optional = true }
|
||||
proptest = { version = "1.0", optional = true }
|
||||
|
||||
@@ -10,10 +10,11 @@ use reth_interfaces::test_utils::generators::{
|
||||
random_transition_range,
|
||||
};
|
||||
use reth_primitives::{Account, Address, SealedBlock, H256};
|
||||
use reth_provider::trie::DBTrieLoader;
|
||||
use reth_stages::{
|
||||
stages::{AccountHashingStage, StorageHashingStage},
|
||||
test_utils::TestTransaction,
|
||||
DBTrieLoader, ExecInput, Stage, UnwindInput,
|
||||
ExecInput, Stage, UnwindInput,
|
||||
};
|
||||
use std::{
|
||||
collections::BTreeMap,
|
||||
|
||||
@@ -53,7 +53,6 @@ mod error;
|
||||
mod id;
|
||||
mod pipeline;
|
||||
mod stage;
|
||||
mod trie;
|
||||
mod util;
|
||||
|
||||
/// The real database type we use in Reth using MDBX.
|
||||
@@ -77,7 +76,6 @@ pub use error::*;
|
||||
pub use id::*;
|
||||
pub use pipeline::*;
|
||||
pub use stage::*;
|
||||
pub use trie::DBTrieLoader;
|
||||
|
||||
// NOTE: Needed so the link in the module-level rustdoc works.
|
||||
#[allow(unused_extern_crates)]
|
||||
|
||||
@@ -9,9 +9,9 @@ use reth_db::{
|
||||
tables,
|
||||
transaction::{DbTx, DbTxMut},
|
||||
};
|
||||
use reth_executor::{execution_result::AccountChangeSet, executor::Executor};
|
||||
use reth_executor::executor::Executor;
|
||||
use reth_interfaces::provider::ProviderError;
|
||||
use reth_primitives::{Address, Block, ChainSpec, Hardfork, StorageEntry, H256, U256};
|
||||
use reth_primitives::{Address, Block, ChainSpec, U256};
|
||||
use reth_provider::{LatestStateProviderRef, StateProvider, Transaction};
|
||||
use reth_revm::database::{State, SubState};
|
||||
use tracing::*;
|
||||
@@ -119,7 +119,7 @@ impl<'a, S: StateProvider> ExecutionStage<'a, S> {
|
||||
let mut state_provider = SubState::new(State::new(LatestStateProviderRef::new(&**tx)));
|
||||
|
||||
// Fetch transactions, execute them and generate results
|
||||
let mut block_change_patches = Vec::with_capacity(block_batch.len());
|
||||
let mut changesets = Vec::with_capacity(block_batch.len());
|
||||
for (header, td, body, ommers, withdrawals) in block_batch.into_iter() {
|
||||
let block_number = header.number;
|
||||
tracing::trace!(target: "sync::stages::execution", ?block_number, "Execute block.");
|
||||
@@ -162,117 +162,11 @@ impl<'a, S: StateProvider> ExecutionStage<'a, S> {
|
||||
Some(signers),
|
||||
)
|
||||
.map_err(|error| StageError::ExecutionError { block: block_number, error })?;
|
||||
block_change_patches.push((changeset, block_number));
|
||||
changesets.push(changeset);
|
||||
}
|
||||
|
||||
// Get last tx count so that we can know amount of transaction in the block.
|
||||
let mut current_transition_id = tx.get_block_transition(last_block)?;
|
||||
info!(target: "sync::stages::execution", current_transition_id, blocks = block_change_patches.len(), "Inserting execution results");
|
||||
|
||||
// apply changes to plain database.
|
||||
for (results, block_number) in block_change_patches.into_iter() {
|
||||
let spurious_dragon_active = self
|
||||
.executor
|
||||
.chain_spec
|
||||
.fork(Hardfork::SpuriousDragon)
|
||||
.active_at_block(block_number);
|
||||
// insert state change set
|
||||
for result in results.tx_changesets.into_iter() {
|
||||
for (address, account_change_set) in result.changeset.into_iter() {
|
||||
let AccountChangeSet { account, wipe_storage, storage } = account_change_set;
|
||||
// apply account change to db. Updates AccountChangeSet and PlainAccountState
|
||||
// tables.
|
||||
trace!(target: "sync::stages::execution", ?address, current_transition_id, ?account, wipe_storage, "Applying account changeset");
|
||||
account.apply_to_db(
|
||||
&**tx,
|
||||
address,
|
||||
current_transition_id,
|
||||
spurious_dragon_active,
|
||||
)?;
|
||||
|
||||
let storage_id = TransitionIdAddress((current_transition_id, address));
|
||||
|
||||
// cast key to H256 and trace the change
|
||||
let storage = storage
|
||||
.into_iter()
|
||||
.map(|(key, (old_value,new_value))| {
|
||||
let hkey = H256(key.to_be_bytes());
|
||||
trace!(target: "sync::stages::execution", ?address, current_transition_id, ?hkey, ?old_value, ?new_value, "Applying storage changeset");
|
||||
(hkey, old_value,new_value)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let mut cursor_storage_changeset =
|
||||
tx.cursor_write::<tables::StorageChangeSet>()?;
|
||||
cursor_storage_changeset.seek_exact(storage_id)?;
|
||||
|
||||
if wipe_storage {
|
||||
// iterate over storage and save them before entry is deleted.
|
||||
tx.cursor_read::<tables::PlainStorageState>()?
|
||||
.walk(Some(address))?
|
||||
.take_while(|res| {
|
||||
res.as_ref().map(|(k, _)| *k == address).unwrap_or_default()
|
||||
})
|
||||
.try_for_each(|entry| {
|
||||
let (_, old_value) = entry?;
|
||||
cursor_storage_changeset.append(storage_id, old_value)
|
||||
})?;
|
||||
|
||||
// delete all entries
|
||||
tx.delete::<tables::PlainStorageState>(address, None)?;
|
||||
|
||||
// insert storage changeset
|
||||
for (key, _, new_value) in storage {
|
||||
// old values are already cleared.
|
||||
if new_value != U256::ZERO {
|
||||
tx.put::<tables::PlainStorageState>(
|
||||
address,
|
||||
StorageEntry { key, value: new_value },
|
||||
)?;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// insert storage changeset
|
||||
for (key, old_value, new_value) in storage {
|
||||
let old_entry = StorageEntry { key, value: old_value };
|
||||
let new_entry = StorageEntry { key, value: new_value };
|
||||
// insert into StorageChangeSet
|
||||
cursor_storage_changeset.append(storage_id, old_entry)?;
|
||||
|
||||
// Always delete old value as duplicate table, put will not override it
|
||||
tx.delete::<tables::PlainStorageState>(address, Some(old_entry))?;
|
||||
if new_value != U256::ZERO {
|
||||
tx.put::<tables::PlainStorageState>(address, new_entry)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// insert bytecode
|
||||
for (hash, bytecode) in result.new_bytecodes.into_iter() {
|
||||
// make different types of bytecode. Checked and maybe even analyzed (needs to
|
||||
// be packed). Currently save only raw bytes.
|
||||
let bytecode = bytecode.bytes();
|
||||
trace!(target: "sync::stages::execution", ?hash, ?bytecode, len = bytecode.len(), "Inserting bytecode");
|
||||
tx.put::<tables::Bytecodes>(hash, bytecode[..bytecode.len()].to_vec())?;
|
||||
|
||||
// NOTE: bytecode bytes are not inserted in change set and can be found in
|
||||
// separate table
|
||||
}
|
||||
current_transition_id += 1;
|
||||
}
|
||||
|
||||
// If there are any post block changes, we will add account changesets to db.
|
||||
for (address, changeset) in results.block_changesets.into_iter() {
|
||||
trace!(target: "sync::stages::execution", ?address, current_transition_id, "Applying block reward");
|
||||
changeset.apply_to_db(
|
||||
&**tx,
|
||||
address,
|
||||
current_transition_id,
|
||||
spurious_dragon_active,
|
||||
)?;
|
||||
}
|
||||
current_transition_id += 1;
|
||||
}
|
||||
// put execution results to database
|
||||
tx.insert_execution_result(changesets, &self.executor.chain_spec, last_block)?;
|
||||
|
||||
let done = !capped;
|
||||
info!(target: "sync::stages::execution", stage_progress = end_block, done, "Sync iteration finished");
|
||||
@@ -413,7 +307,8 @@ mod tests {
|
||||
models::AccountBeforeTx,
|
||||
};
|
||||
use reth_primitives::{
|
||||
hex_literal::hex, keccak256, Account, ChainSpecBuilder, SealedBlock, H160, MAINNET, U256,
|
||||
hex_literal::hex, keccak256, Account, ChainSpecBuilder, SealedBlock, StorageEntry, H160,
|
||||
H256, MAINNET, U256,
|
||||
};
|
||||
use reth_provider::insert_canonical_block;
|
||||
use reth_rlp::Decodable;
|
||||
|
||||
@@ -7,11 +7,7 @@ use reth_db::{
|
||||
};
|
||||
use reth_primitives::{keccak256, Account, Address};
|
||||
use reth_provider::Transaction;
|
||||
use std::{
|
||||
collections::{BTreeMap, BTreeSet},
|
||||
fmt::Debug,
|
||||
ops::Range,
|
||||
};
|
||||
use std::{collections::BTreeMap, fmt::Debug, ops::Range};
|
||||
use tracing::*;
|
||||
|
||||
/// The [`StageId`] of the account hashing stage.
|
||||
@@ -180,38 +176,15 @@ impl<DB: Database> Stage<DB> for AccountHashingStage {
|
||||
break
|
||||
}
|
||||
} else {
|
||||
let mut plain_accounts = tx.cursor_read::<tables::PlainAccountState>()?;
|
||||
let mut hashed_accounts = tx.cursor_write::<tables::HashedAccount>()?;
|
||||
|
||||
// Aggregate all transition changesets and and make list of account that have been
|
||||
// changed.
|
||||
tx.cursor_read::<tables::AccountChangeSet>()?
|
||||
.walk_range(from_transition..to_transition)?
|
||||
.collect::<Result<Vec<_>, _>>()?
|
||||
.into_iter()
|
||||
// fold all account to one set of changed accounts
|
||||
.fold(BTreeSet::new(), |mut accounts: BTreeSet<Address>, (_, account_before)| {
|
||||
accounts.insert(account_before.address);
|
||||
accounts
|
||||
})
|
||||
.into_iter()
|
||||
// iterate over plain state and get newest value.
|
||||
// Assumption we are okay to make is that plainstate represent
|
||||
// `previous_stage_progress` state.
|
||||
.map(|address| {
|
||||
plain_accounts.seek_exact(address).map(|a| (address, a.map(|(_, v)| v)))
|
||||
})
|
||||
.collect::<Result<Vec<_>, _>>()?
|
||||
.into_iter()
|
||||
.try_for_each(|(address, account)| -> Result<(), StageError> {
|
||||
let hashed_address = keccak256(address);
|
||||
if let Some(account) = account {
|
||||
hashed_accounts.upsert(hashed_address, account)?
|
||||
} else if hashed_accounts.seek_exact(hashed_address)?.is_some() {
|
||||
hashed_accounts.delete_current()?;
|
||||
}
|
||||
Ok(())
|
||||
})?;
|
||||
let lists = tx.get_addresses_of_changed_accounts(from_transition, to_transition)?;
|
||||
// iterate over plain state and get newest value.
|
||||
// Assumption we are okay to make is that plainstate represent
|
||||
// `previous_stage_progress` state.
|
||||
let accounts = tx.get_plainstate_accounts(lists.into_iter())?;
|
||||
// insert and hash accounts to hashing table
|
||||
tx.insert_account_for_hashing(accounts.into_iter())?;
|
||||
}
|
||||
|
||||
info!(target: "sync::stages::hashing_account", "Stage finished");
|
||||
|
||||
@@ -9,10 +9,7 @@ use reth_db::{
|
||||
};
|
||||
use reth_primitives::{keccak256, Address, StorageEntry, H256, U256};
|
||||
use reth_provider::Transaction;
|
||||
use std::{
|
||||
collections::{BTreeMap, BTreeSet},
|
||||
fmt::Debug,
|
||||
};
|
||||
use std::{collections::BTreeMap, fmt::Debug};
|
||||
use tracing::*;
|
||||
|
||||
/// The [`StageId`] of the storage hashing stage.
|
||||
@@ -133,70 +130,15 @@ impl<DB: Database> Stage<DB> for StorageHashingStage {
|
||||
}
|
||||
}
|
||||
} else {
|
||||
let mut plain_storage = tx.cursor_dup_read::<tables::PlainStorageState>()?;
|
||||
let mut hashed_storage = tx.cursor_dup_write::<tables::HashedStorage>()?;
|
||||
|
||||
// Aggregate all transition changesets and and make list of storages that have been
|
||||
// changed.
|
||||
tx.cursor_read::<tables::StorageChangeSet>()?
|
||||
.walk_range(
|
||||
TransitionIdAddress((from_transition, Address::zero()))..
|
||||
TransitionIdAddress((to_transition, Address::zero())),
|
||||
)?
|
||||
.collect::<Result<Vec<_>, _>>()?
|
||||
.into_iter()
|
||||
// fold all storages and save its old state so we can remove it from HashedStorage
|
||||
// it is needed as it is dup table.
|
||||
.fold(
|
||||
BTreeMap::new(),
|
||||
|mut accounts: BTreeMap<Address, BTreeSet<H256>>,
|
||||
(TransitionIdAddress((_, address)), storage_entry)| {
|
||||
accounts.entry(address).or_default().insert(storage_entry.key);
|
||||
accounts
|
||||
},
|
||||
)
|
||||
.into_iter()
|
||||
// iterate over plain state and get newest storage value.
|
||||
// Assumption we are okay with is that plain state represent
|
||||
// `previous_stage_progress` state.
|
||||
.map(|(address, storage)| {
|
||||
let res = (
|
||||
keccak256(address),
|
||||
storage
|
||||
.into_iter()
|
||||
.map(|key| {
|
||||
Ok::<Option<_>, reth_db::Error>(
|
||||
plain_storage
|
||||
.seek_by_key_subkey(address, key)?
|
||||
.filter(|v| v.key == key)
|
||||
.map(|ret| (keccak256(key), ret.value)),
|
||||
)
|
||||
})
|
||||
.collect::<Result<Vec<Option<_>>, _>>()?
|
||||
.into_iter()
|
||||
.flatten()
|
||||
.collect::<BTreeMap<_, _>>(),
|
||||
);
|
||||
Ok::<_, reth_db::Error>(res)
|
||||
})
|
||||
.collect::<Result<BTreeMap<_, _>, _>>()?
|
||||
.into_iter()
|
||||
// Hash the address and key and apply them to HashedStorage (if Storage is None
|
||||
// just remove it);
|
||||
.try_for_each(|(hashed_address, storage)| {
|
||||
storage.into_iter().try_for_each(|(key, val)| -> Result<(), StageError> {
|
||||
if hashed_storage
|
||||
.seek_by_key_subkey(hashed_address, key)?
|
||||
.filter(|entry| entry.key == key)
|
||||
.is_some()
|
||||
{
|
||||
hashed_storage.delete_current()?;
|
||||
}
|
||||
|
||||
hashed_storage.upsert(hashed_address, StorageEntry { key, value: val })?;
|
||||
Ok(())
|
||||
})
|
||||
})?;
|
||||
let lists =
|
||||
tx.get_addresses_and_keys_of_changed_storages(from_transition, to_transition)?;
|
||||
// iterate over plain state and get newest storage value.
|
||||
// Assumption we are okay with is that plain state represent
|
||||
// `previous_stage_progress` state.
|
||||
let storages = tx.get_plainstate_storages(lists.into_iter())?;
|
||||
tx.insert_storage_for_hashing(storages.into_iter())?;
|
||||
}
|
||||
|
||||
info!(target: "sync::stages::hashing_storage", "Stage finished");
|
||||
|
||||
@@ -1,9 +1,8 @@
|
||||
use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput};
|
||||
use itertools::Itertools;
|
||||
use reth_db::{
|
||||
cursor::{DbCursorRO, DbCursorRW},
|
||||
database::{Database, DatabaseGAT},
|
||||
models::{sharded_key::NUM_OF_INDICES_IN_SHARD, ShardedKey},
|
||||
models::ShardedKey,
|
||||
tables,
|
||||
transaction::{DbTx, DbTxMut, DbTxMutGAT},
|
||||
TransitionList,
|
||||
@@ -57,52 +56,10 @@ impl<DB: Database> Stage<DB> for IndexAccountHistoryStage {
|
||||
std::cmp::min(stage_progress + self.commit_threshold, previous_stage_progress);
|
||||
let to_transition = tx.get_block_transition(to_block)?;
|
||||
|
||||
let account_changesets = tx
|
||||
.cursor_read::<tables::AccountChangeSet>()?
|
||||
.walk(Some(from_transition))?
|
||||
.take_while(|res| res.as_ref().map(|(k, _)| *k < to_transition).unwrap_or_default())
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
|
||||
let account_changeset_lists = account_changesets
|
||||
.into_iter()
|
||||
// fold all account to one set of changed accounts
|
||||
.fold(
|
||||
BTreeMap::new(),
|
||||
|mut accounts: BTreeMap<Address, Vec<u64>>, (index, account)| {
|
||||
accounts.entry(account.address).or_default().push(index);
|
||||
accounts
|
||||
},
|
||||
);
|
||||
// insert indexes to AccontHistory.
|
||||
for (address, mut indices) in account_changeset_lists {
|
||||
let mut last_shard = take_last_account_shard(tx, address)?;
|
||||
last_shard.append(&mut indices);
|
||||
// chunk indices and insert them in shards of N size.
|
||||
let mut chunks = last_shard
|
||||
.iter()
|
||||
.chunks(NUM_OF_INDICES_IN_SHARD)
|
||||
.into_iter()
|
||||
.map(|chunks| chunks.map(|i| *i as usize).collect::<Vec<usize>>())
|
||||
.collect::<Vec<_>>();
|
||||
let last_chunk = chunks.pop();
|
||||
|
||||
chunks.into_iter().try_for_each(|list| {
|
||||
tx.put::<tables::AccountHistory>(
|
||||
ShardedKey::new(
|
||||
address,
|
||||
*list.last().expect("Chuck does not return empty list") as TransitionId,
|
||||
),
|
||||
TransitionList::new(list).expect("Indices are presorted and not empty"),
|
||||
)
|
||||
})?;
|
||||
// Insert last list with u64::MAX
|
||||
if let Some(last_list) = last_chunk {
|
||||
tx.put::<tables::AccountHistory>(
|
||||
ShardedKey::new(address, u64::MAX),
|
||||
TransitionList::new(last_list).expect("Indices are presorted and not empty"),
|
||||
)?
|
||||
}
|
||||
}
|
||||
let indices =
|
||||
tx.get_account_transition_ids_from_changeset(from_transition, to_transition)?;
|
||||
// Insert changeset to history index
|
||||
tx.insert_account_history_index(indices)?;
|
||||
|
||||
info!(target: "sync::stages::index_account_history", "Stage finished");
|
||||
Ok(ExecOutput { stage_progress: to_block, done: true })
|
||||
@@ -155,23 +112,6 @@ impl<DB: Database> Stage<DB> for IndexAccountHistoryStage {
|
||||
}
|
||||
}
|
||||
|
||||
/// Load last shard and check if it is full and remove if it is not. If list is empty, last shard
|
||||
/// was full or there is no shards at all.
|
||||
pub fn take_last_account_shard<DB: Database>(
|
||||
tx: &Transaction<'_, DB>,
|
||||
address: Address,
|
||||
) -> Result<Vec<u64>, StageError> {
|
||||
let mut cursor = tx.cursor_read::<tables::AccountHistory>()?;
|
||||
let last = cursor.seek_exact(ShardedKey::new(address, u64::MAX))?;
|
||||
if let Some((shard_key, list)) = last {
|
||||
// delete old shard so new one can be inserted.
|
||||
tx.delete::<tables::AccountHistory>(shard_key, None)?;
|
||||
let list = list.iter(0).map(|i| i as u64).collect::<Vec<_>>();
|
||||
return Ok(list)
|
||||
}
|
||||
Ok(Vec::new())
|
||||
}
|
||||
|
||||
/// Unwind all history shards. For boundary shard, remove it from database and
|
||||
/// return last part of shard with still valid items. If all full shard were removed, return list
|
||||
/// would be empty.
|
||||
@@ -212,7 +152,7 @@ pub fn unwind_account_history_shards<DB: Database>(
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::test_utils::{TestTransaction, PREV_STAGE_ID};
|
||||
use reth_db::models::AccountBeforeTx;
|
||||
use reth_db::models::{sharded_key::NUM_OF_INDICES_IN_SHARD, AccountBeforeTx};
|
||||
use reth_primitives::{hex_literal::hex, H160};
|
||||
|
||||
const ADDRESS: H160 = H160(hex!("0000000000000000000000000000000000000001"));
|
||||
|
||||
@@ -1,9 +1,8 @@
|
||||
use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput};
|
||||
use itertools::Itertools;
|
||||
use reth_db::{
|
||||
cursor::{DbCursorRO, DbCursorRW},
|
||||
database::{Database, DatabaseGAT},
|
||||
models::{sharded_key::NUM_OF_INDICES_IN_SHARD, storage_sharded_key::StorageShardedKey},
|
||||
models::storage_sharded_key::StorageShardedKey,
|
||||
tables,
|
||||
transaction::{DbTx, DbTxMut, DbTxMutGAT},
|
||||
TransitionList,
|
||||
@@ -57,58 +56,9 @@ impl<DB: Database> Stage<DB> for IndexStorageHistoryStage {
|
||||
std::cmp::min(stage_progress + self.commit_threshold, previous_stage_progress);
|
||||
let to_transition = tx.get_block_transition(to_block)?;
|
||||
|
||||
let storage_chageset = tx
|
||||
.cursor_read::<tables::StorageChangeSet>()?
|
||||
.walk(Some((from_transition, Address::zero()).into()))?
|
||||
.take_while(|res| {
|
||||
res.as_ref().map(|(k, _)| k.transition_id() < to_transition).unwrap_or_default()
|
||||
})
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
|
||||
// fold all storages to one set of changes
|
||||
let storage_changeset_lists = storage_chageset.into_iter().fold(
|
||||
BTreeMap::new(),
|
||||
|mut storages: BTreeMap<(Address, H256), Vec<u64>>, (index, storage)| {
|
||||
storages
|
||||
.entry((index.address(), storage.key))
|
||||
.or_default()
|
||||
.push(index.transition_id());
|
||||
storages
|
||||
},
|
||||
);
|
||||
|
||||
for ((address, storage_key), mut indices) in storage_changeset_lists {
|
||||
let mut last_shard = take_last_storage_shard(tx, address, storage_key)?;
|
||||
last_shard.append(&mut indices);
|
||||
|
||||
// chunk indices and insert them in shards of N size.
|
||||
let mut chunks = last_shard
|
||||
.iter()
|
||||
.chunks(NUM_OF_INDICES_IN_SHARD)
|
||||
.into_iter()
|
||||
.map(|chunks| chunks.map(|i| *i as usize).collect::<Vec<usize>>())
|
||||
.collect::<Vec<_>>();
|
||||
let last_chunk = chunks.pop();
|
||||
|
||||
// chunk indices and insert them in shards of N size.
|
||||
chunks.into_iter().try_for_each(|list| {
|
||||
tx.put::<tables::StorageHistory>(
|
||||
StorageShardedKey::new(
|
||||
address,
|
||||
storage_key,
|
||||
*list.last().expect("Chuck does not return empty list") as TransitionId,
|
||||
),
|
||||
TransitionList::new(list).expect("Indices are presorted and not empty"),
|
||||
)
|
||||
})?;
|
||||
// Insert last list with u64::MAX
|
||||
if let Some(last_list) = last_chunk {
|
||||
tx.put::<tables::StorageHistory>(
|
||||
StorageShardedKey::new(address, storage_key, u64::MAX),
|
||||
TransitionList::new(last_list).expect("Indices are presorted and not empty"),
|
||||
)?;
|
||||
}
|
||||
}
|
||||
let indices =
|
||||
tx.get_storage_transition_ids_from_changeset(from_transition, to_transition)?;
|
||||
tx.insert_storage_history_index(indices)?;
|
||||
|
||||
info!(target: "sync::stages::index_storage_history", "Stage finished");
|
||||
Ok(ExecOutput { stage_progress: to_block, done: true })
|
||||
@@ -164,24 +114,6 @@ impl<DB: Database> Stage<DB> for IndexStorageHistoryStage {
|
||||
}
|
||||
}
|
||||
|
||||
/// Load last shard and check if it is full and remove if it is not. If list is empty, last shard
|
||||
/// was full or there is no shards at all.
|
||||
pub fn take_last_storage_shard<DB: Database>(
|
||||
tx: &Transaction<'_, DB>,
|
||||
address: Address,
|
||||
storage_key: H256,
|
||||
) -> Result<Vec<u64>, StageError> {
|
||||
let mut cursor = tx.cursor_read::<tables::StorageHistory>()?;
|
||||
let last = cursor.seek_exact(StorageShardedKey::new(address, storage_key, u64::MAX))?;
|
||||
if let Some((storage_shard_key, list)) = last {
|
||||
// delete old shard so new one can be inserted.
|
||||
tx.delete::<tables::StorageHistory>(storage_shard_key, None)?;
|
||||
let list = list.iter(0).map(|i| i as u64).collect::<Vec<_>>();
|
||||
return Ok(list)
|
||||
}
|
||||
Ok(Vec::new())
|
||||
}
|
||||
|
||||
/// Unwind all history shards. For boundary shard, remove it from database and
|
||||
/// return last part of shard with still valid items. If all full shard were removed, return list
|
||||
/// would be empty but this does not mean that there is none shard left but that there is no
|
||||
@@ -226,7 +158,9 @@ mod tests {
|
||||
|
||||
use super::*;
|
||||
use crate::test_utils::{TestTransaction, PREV_STAGE_ID};
|
||||
use reth_db::models::{ShardedKey, TransitionIdAddress};
|
||||
use reth_db::models::{
|
||||
storage_sharded_key::NUM_OF_INDICES_IN_SHARD, ShardedKey, TransitionIdAddress,
|
||||
};
|
||||
use reth_primitives::{hex_literal::hex, StorageEntry, H160, U256};
|
||||
|
||||
const ADDRESS: H160 = H160(hex!("0000000000000000000000000000000000000001"));
|
||||
|
||||
@@ -1,10 +1,7 @@
|
||||
use crate::{
|
||||
trie::DBTrieLoader, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput,
|
||||
UnwindOutput,
|
||||
};
|
||||
use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput};
|
||||
use reth_db::{database::Database, tables, transaction::DbTx};
|
||||
use reth_interfaces::consensus;
|
||||
use reth_provider::Transaction;
|
||||
use reth_provider::{trie::DBTrieLoader, Transaction};
|
||||
use std::fmt::Debug;
|
||||
use tracing::*;
|
||||
|
||||
|
||||
@@ -13,18 +13,31 @@ reth-primitives = { path = "../../primitives" }
|
||||
reth-interfaces = { path = "../../interfaces" }
|
||||
reth-revm-primitives = { path = "../../revm/revm-primitives" }
|
||||
reth-db = { path = "../db" }
|
||||
reth-tracing = {path = "../../tracing"}
|
||||
reth-rlp = {path = "../../rlp"}
|
||||
|
||||
revm-primitives = "1.0.0"
|
||||
|
||||
# trie
|
||||
cita_trie = "4.0.0"
|
||||
hasher = "0.1.4"
|
||||
|
||||
# misc
|
||||
thiserror = "1.0.37"
|
||||
auto_impl = "1.0"
|
||||
itertools = "0.10"
|
||||
|
||||
# feature test-utils
|
||||
parking_lot = { version = "0.12", optional = true }
|
||||
revm-primitives = "1.0"
|
||||
|
||||
[dev-dependencies]
|
||||
reth-db = { path = "../db", features = ["test-utils"] }
|
||||
parking_lot = "0.12"
|
||||
proptest = { version = "1.0" }
|
||||
assert_matches = "1.5"
|
||||
|
||||
# trie
|
||||
triehash = "0.8"
|
||||
|
||||
[features]
|
||||
bench = []
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
//! Output of execution.
|
||||
|
||||
use reth_db::{models::AccountBeforeTx, tables, transaction::DbTxMut, Error as DbError};
|
||||
use reth_primitives::{Account, Address, Receipt, H256, U256};
|
||||
use revm::primitives::Bytecode;
|
||||
use revm_primitives::Bytecode;
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
/// Execution Result containing vector of transaction changesets
|
||||
@@ -22,6 +22,12 @@ pub use providers::{
|
||||
LatestStateProviderRef, ShareableDatabase,
|
||||
};
|
||||
|
||||
/// Merkle trie
|
||||
pub mod trie;
|
||||
|
||||
/// Execution result
|
||||
pub mod execution_result;
|
||||
|
||||
/// Helper types for interacting with the database
|
||||
mod transaction;
|
||||
pub use transaction::{Transaction, TransactionError};
|
||||
|
||||
@@ -1,19 +1,36 @@
|
||||
#![allow(dead_code)]
|
||||
use itertools::Itertools;
|
||||
use reth_db::{
|
||||
cursor::DbCursorRO,
|
||||
cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO},
|
||||
database::{Database, DatabaseGAT},
|
||||
models::StoredBlockBody,
|
||||
models::{
|
||||
sharded_key,
|
||||
storage_sharded_key::{self, StorageShardedKey},
|
||||
ShardedKey, StoredBlockBody, TransitionIdAddress,
|
||||
},
|
||||
table::Table,
|
||||
tables,
|
||||
transaction::{DbTx, DbTxMut},
|
||||
TransitionList,
|
||||
};
|
||||
use reth_interfaces::{db::Error as DbError, provider::ProviderError};
|
||||
use reth_primitives::{BlockHash, BlockNumber, Header, TransitionId, TxNumber, U256};
|
||||
use reth_primitives::{
|
||||
keccak256, Account, Address, BlockHash, BlockNumber, ChainSpec, Hardfork, Header, SealedBlock,
|
||||
StorageEntry, TransitionId, TxNumber, H256, U256,
|
||||
};
|
||||
use reth_tracing::tracing::{info, trace};
|
||||
use std::{
|
||||
collections::{BTreeMap, BTreeSet},
|
||||
fmt::Debug,
|
||||
ops::{Deref, DerefMut},
|
||||
};
|
||||
|
||||
use crate::{
|
||||
insert_canonical_block,
|
||||
trie::{DBTrieLoader, TrieError},
|
||||
};
|
||||
|
||||
use crate::execution_result::{AccountChangeSet, ExecutionResult};
|
||||
|
||||
/// A container for any DB transaction that will open a new inner transaction when the current
|
||||
/// one is committed.
|
||||
// NOTE: This container is needed since `Transaction::commit` takes `mut self`, so methods in
|
||||
@@ -71,6 +88,11 @@ where
|
||||
Ok(Self { db, tx: Some(db.tx_mut()?) })
|
||||
}
|
||||
|
||||
/// Creates a new container with given database and transaction handles.
|
||||
pub fn new_raw(db: &'this DB, tx: <DB as DatabaseGAT<'this>>::TXMut) -> Self {
|
||||
Self { db, tx: Some(tx) }
|
||||
}
|
||||
|
||||
/// Accessor to the internal Database
|
||||
pub fn inner(&self) -> &'this DB {
|
||||
self.db
|
||||
@@ -213,6 +235,505 @@ where
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Load last shard and check if it is full and remove if it is not. If list is empty, last
|
||||
/// shard was full or there is no shards at all.
|
||||
fn take_last_account_shard(&self, address: Address) -> Result<Vec<u64>, TransactionError> {
|
||||
let mut cursor = self.cursor_read::<tables::AccountHistory>()?;
|
||||
let last = cursor.seek_exact(ShardedKey::new(address, u64::MAX))?;
|
||||
if let Some((shard_key, list)) = last {
|
||||
// delete old shard so new one can be inserted.
|
||||
self.delete::<tables::AccountHistory>(shard_key, None)?;
|
||||
let list = list.iter(0).map(|i| i as u64).collect::<Vec<_>>();
|
||||
return Ok(list)
|
||||
}
|
||||
Ok(Vec::new())
|
||||
}
|
||||
|
||||
/// Load last shard and check if it is full and remove if it is not. If list is empty, last
|
||||
/// shard was full or there is no shards at all.
|
||||
pub fn take_last_storage_shard(
|
||||
&self,
|
||||
address: Address,
|
||||
storage_key: H256,
|
||||
) -> Result<Vec<u64>, TransactionError> {
|
||||
let mut cursor = self.cursor_read::<tables::StorageHistory>()?;
|
||||
let last = cursor.seek_exact(StorageShardedKey::new(address, storage_key, u64::MAX))?;
|
||||
if let Some((storage_shard_key, list)) = last {
|
||||
// delete old shard so new one can be inserted.
|
||||
self.delete::<tables::StorageHistory>(storage_shard_key, None)?;
|
||||
let list = list.iter(0).map(|i| i as u64).collect::<Vec<_>>();
|
||||
return Ok(list)
|
||||
}
|
||||
Ok(Vec::new())
|
||||
}
|
||||
}
|
||||
|
||||
/// Stages impl
|
||||
impl<'this, DB> Transaction<'this, DB>
|
||||
where
|
||||
DB: Database,
|
||||
{
|
||||
/// Insert full block and make it canonical
|
||||
///
|
||||
/// This is atomic operation and transaction will do one commit at the end of the function.
|
||||
pub fn insert_block(
|
||||
&mut self,
|
||||
block: &SealedBlock,
|
||||
chain_spec: &ChainSpec,
|
||||
changeset: ExecutionResult,
|
||||
) -> Result<(), TransactionError> {
|
||||
// Header, Body, SenderRecovery, TD, TxLookup stages
|
||||
let (from, to) = insert_canonical_block(self.deref_mut(), block, false).unwrap();
|
||||
|
||||
let parent_block_number = block.number - 1;
|
||||
|
||||
// execution stage
|
||||
self.insert_execution_result(vec![changeset], chain_spec, parent_block_number)?;
|
||||
|
||||
// storage hashing stage
|
||||
{
|
||||
let lists = self.get_addresses_and_keys_of_changed_storages(from, to)?;
|
||||
let storages = self.get_plainstate_storages(lists.into_iter())?;
|
||||
self.insert_storage_for_hashing(storages.into_iter())?;
|
||||
}
|
||||
|
||||
// account hashing stage
|
||||
{
|
||||
let lists = self.get_addresses_of_changed_accounts(from, to)?;
|
||||
let accounts = self.get_plainstate_accounts(lists.into_iter())?;
|
||||
self.insert_account_for_hashing(accounts.into_iter())?;
|
||||
}
|
||||
|
||||
// merkle tree
|
||||
{
|
||||
let current_root = self.get_header(parent_block_number)?.state_root;
|
||||
let loader = DBTrieLoader::default();
|
||||
let root = loader.update_root(self, current_root, from..to)?;
|
||||
if root != block.state_root {
|
||||
return Err(TransactionError::StateTrieRootMismatch {
|
||||
got: root,
|
||||
expected: block.state_root,
|
||||
block_number: block.number,
|
||||
block_hash: block.hash(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// account history stage
|
||||
{
|
||||
let indices = self.get_account_transition_ids_from_changeset(from, to)?;
|
||||
self.insert_account_history_index(indices)?;
|
||||
}
|
||||
|
||||
// storage history stage
|
||||
{
|
||||
let indices = self.get_storage_transition_ids_from_changeset(from, to)?;
|
||||
self.insert_storage_history_index(indices)?;
|
||||
}
|
||||
|
||||
// commit block to database
|
||||
self.commit()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Iterate over account changesets and return all account address that were changed.
|
||||
pub fn get_addresses_and_keys_of_changed_storages(
|
||||
&self,
|
||||
from: TransitionId,
|
||||
to: TransitionId,
|
||||
) -> Result<BTreeMap<Address, BTreeSet<H256>>, TransactionError> {
|
||||
Ok(self
|
||||
.cursor_read::<tables::StorageChangeSet>()?
|
||||
.walk_range(
|
||||
TransitionIdAddress((from, Address::zero()))..
|
||||
TransitionIdAddress((to, Address::zero())),
|
||||
)?
|
||||
.collect::<Result<Vec<_>, _>>()?
|
||||
.into_iter()
|
||||
// fold all storages and save its old state so we can remove it from HashedStorage
|
||||
// it is needed as it is dup table.
|
||||
.fold(
|
||||
BTreeMap::new(),
|
||||
|mut accounts: BTreeMap<Address, BTreeSet<H256>>,
|
||||
(TransitionIdAddress((_, address)), storage_entry)| {
|
||||
accounts.entry(address).or_default().insert(storage_entry.key);
|
||||
accounts
|
||||
},
|
||||
))
|
||||
}
|
||||
|
||||
/// Get plainstate storages
|
||||
#[allow(clippy::type_complexity)]
|
||||
pub fn get_plainstate_storages(
|
||||
&self,
|
||||
iter: impl IntoIterator<Item = (Address, impl IntoIterator<Item = H256>)>,
|
||||
) -> Result<Vec<(Address, Vec<(H256, U256)>)>, TransactionError> {
|
||||
let mut plain_storage = self.cursor_dup_read::<tables::PlainStorageState>()?;
|
||||
|
||||
iter.into_iter()
|
||||
.map(|(address, storage)| {
|
||||
storage
|
||||
.into_iter()
|
||||
.map(|key| -> Result<_, TransactionError> {
|
||||
let ret = plain_storage
|
||||
.seek_by_key_subkey(address, key)?
|
||||
.filter(|v| v.key == key)
|
||||
.unwrap_or_default();
|
||||
Ok((key, ret.value))
|
||||
})
|
||||
.collect::<Result<Vec<(_, _)>, _>>()
|
||||
.map(|storage| (address, storage))
|
||||
})
|
||||
.collect::<Result<Vec<(_, _)>, _>>()
|
||||
}
|
||||
|
||||
/// iterate over storages and insert them to hashing table
|
||||
pub fn insert_storage_for_hashing(
|
||||
&self,
|
||||
storages: impl IntoIterator<Item = (Address, impl IntoIterator<Item = (H256, U256)>)>,
|
||||
) -> Result<(), TransactionError> {
|
||||
// hash values
|
||||
let hashed = storages.into_iter().fold(BTreeMap::new(), |mut map, (address, storage)| {
|
||||
let storage = storage.into_iter().fold(BTreeMap::new(), |mut map, (key, value)| {
|
||||
map.insert(keccak256(key), value);
|
||||
map
|
||||
});
|
||||
map.insert(keccak256(address), storage);
|
||||
map
|
||||
});
|
||||
|
||||
let mut hashed_storage = self.cursor_dup_write::<tables::HashedStorage>()?;
|
||||
// Hash the address and key and apply them to HashedStorage (if Storage is None
|
||||
// just remove it);
|
||||
hashed.into_iter().try_for_each(|(hashed_address, storage)| {
|
||||
storage.into_iter().try_for_each(|(key, value)| -> Result<(), TransactionError> {
|
||||
if hashed_storage
|
||||
.seek_by_key_subkey(hashed_address, key)?
|
||||
.filter(|entry| entry.key == key)
|
||||
.is_some()
|
||||
{
|
||||
hashed_storage.delete_current()?;
|
||||
}
|
||||
|
||||
if value != U256::ZERO {
|
||||
hashed_storage.upsert(hashed_address, StorageEntry { key, value })?;
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Iterate over account changesets and return all account address that were changed.
|
||||
pub fn get_addresses_of_changed_accounts(
|
||||
&self,
|
||||
from: TransitionId,
|
||||
to: TransitionId,
|
||||
) -> Result<BTreeSet<Address>, TransactionError> {
|
||||
Ok(self
|
||||
.cursor_read::<tables::AccountChangeSet>()?
|
||||
.walk_range(from..to)?
|
||||
.collect::<Result<Vec<_>, _>>()?
|
||||
.into_iter()
|
||||
// fold all account to one set of changed accounts
|
||||
.fold(BTreeSet::new(), |mut accounts: BTreeSet<Address>, (_, account_before)| {
|
||||
accounts.insert(account_before.address);
|
||||
accounts
|
||||
}))
|
||||
}
|
||||
|
||||
/// Get plainstate account from iterator
|
||||
pub fn get_plainstate_accounts(
|
||||
&self,
|
||||
iter: impl IntoIterator<Item = Address>,
|
||||
) -> Result<Vec<(Address, Option<Account>)>, TransactionError> {
|
||||
let mut plain_accounts = self.cursor_read::<tables::PlainAccountState>()?;
|
||||
Ok(iter
|
||||
.into_iter()
|
||||
.map(|address| plain_accounts.seek_exact(address).map(|a| (address, a.map(|(_, v)| v))))
|
||||
.collect::<Result<Vec<_>, _>>()?)
|
||||
}
|
||||
|
||||
/// iterate over accounts and insert them to hashing table
|
||||
pub fn insert_account_for_hashing(
|
||||
&self,
|
||||
accounts: impl IntoIterator<Item = (Address, Option<Account>)>,
|
||||
) -> Result<(), TransactionError> {
|
||||
let mut hashed_accounts = self.cursor_write::<tables::HashedAccount>()?;
|
||||
|
||||
let hashes_accounts = accounts.into_iter().fold(
|
||||
BTreeMap::new(),
|
||||
|mut map: BTreeMap<H256, Option<Account>>, (address, account)| {
|
||||
map.insert(keccak256(address), account);
|
||||
map
|
||||
},
|
||||
);
|
||||
|
||||
hashes_accounts.into_iter().try_for_each(
|
||||
|(hashed_address, account)| -> Result<(), TransactionError> {
|
||||
if let Some(account) = account {
|
||||
hashed_accounts.upsert(hashed_address, account)?
|
||||
} else if hashed_accounts.seek_exact(hashed_address)?.is_some() {
|
||||
hashed_accounts.delete_current()?;
|
||||
}
|
||||
Ok(())
|
||||
},
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get all transaction ids where account got changed.
|
||||
pub fn get_storage_transition_ids_from_changeset(
|
||||
&self,
|
||||
from: TransitionId,
|
||||
to: TransitionId,
|
||||
) -> Result<BTreeMap<(Address, H256), Vec<u64>>, TransactionError> {
|
||||
let storage_changeset = self
|
||||
.cursor_read::<tables::StorageChangeSet>()?
|
||||
.walk(Some((from, Address::zero()).into()))?
|
||||
.take_while(|res| res.as_ref().map(|(k, _)| k.transition_id() < to).unwrap_or_default())
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
|
||||
// fold all storages to one set of changes
|
||||
let storage_changeset_lists = storage_changeset.into_iter().fold(
|
||||
BTreeMap::new(),
|
||||
|mut storages: BTreeMap<(Address, H256), Vec<u64>>, (index, storage)| {
|
||||
storages
|
||||
.entry((index.address(), storage.key))
|
||||
.or_default()
|
||||
.push(index.transition_id());
|
||||
storages
|
||||
},
|
||||
);
|
||||
|
||||
Ok(storage_changeset_lists)
|
||||
}
|
||||
|
||||
/// Get all transaction ids where account got changed.
|
||||
pub fn get_account_transition_ids_from_changeset(
|
||||
&self,
|
||||
from: TransitionId,
|
||||
to: TransitionId,
|
||||
) -> Result<BTreeMap<Address, Vec<u64>>, TransactionError> {
|
||||
let account_changesets = self
|
||||
.cursor_read::<tables::AccountChangeSet>()?
|
||||
.walk(Some(from))?
|
||||
.take_while(|res| res.as_ref().map(|(k, _)| *k < to).unwrap_or_default())
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
|
||||
let account_transtions = account_changesets
|
||||
.into_iter()
|
||||
// fold all account to one set of changed accounts
|
||||
.fold(
|
||||
BTreeMap::new(),
|
||||
|mut accounts: BTreeMap<Address, Vec<u64>>, (index, account)| {
|
||||
accounts.entry(account.address).or_default().push(index);
|
||||
accounts
|
||||
},
|
||||
);
|
||||
|
||||
Ok(account_transtions)
|
||||
}
|
||||
|
||||
/// Insert storage change index to database. Used inside StorageHistoryIndex stage
|
||||
pub fn insert_storage_history_index(
|
||||
&self,
|
||||
storage_transitions: BTreeMap<(Address, H256), Vec<u64>>,
|
||||
) -> Result<(), TransactionError> {
|
||||
for ((address, storage_key), mut indices) in storage_transitions {
|
||||
let mut last_shard = self.take_last_storage_shard(address, storage_key)?;
|
||||
last_shard.append(&mut indices);
|
||||
|
||||
// chunk indices and insert them in shards of N size.
|
||||
let mut chunks = last_shard
|
||||
.iter()
|
||||
.chunks(storage_sharded_key::NUM_OF_INDICES_IN_SHARD)
|
||||
.into_iter()
|
||||
.map(|chunks| chunks.map(|i| *i as usize).collect::<Vec<usize>>())
|
||||
.collect::<Vec<_>>();
|
||||
let last_chunk = chunks.pop();
|
||||
|
||||
// chunk indices and insert them in shards of N size.
|
||||
chunks.into_iter().try_for_each(|list| {
|
||||
self.put::<tables::StorageHistory>(
|
||||
StorageShardedKey::new(
|
||||
address,
|
||||
storage_key,
|
||||
*list.last().expect("Chuck does not return empty list") as TransitionId,
|
||||
),
|
||||
TransitionList::new(list).expect("Indices are presorted and not empty"),
|
||||
)
|
||||
})?;
|
||||
// Insert last list with u64::MAX
|
||||
if let Some(last_list) = last_chunk {
|
||||
self.put::<tables::StorageHistory>(
|
||||
StorageShardedKey::new(address, storage_key, u64::MAX),
|
||||
TransitionList::new(last_list).expect("Indices are presorted and not empty"),
|
||||
)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Insert account change index to database. Used inside AccountHistoryIndex stage
|
||||
pub fn insert_account_history_index(
|
||||
&self,
|
||||
account_transitions: BTreeMap<Address, Vec<u64>>,
|
||||
) -> Result<(), TransactionError> {
|
||||
// insert indexes to AccountHistory.
|
||||
for (address, mut indices) in account_transitions {
|
||||
let mut last_shard = self.take_last_account_shard(address)?;
|
||||
last_shard.append(&mut indices);
|
||||
// chunk indices and insert them in shards of N size.
|
||||
let mut chunks = last_shard
|
||||
.iter()
|
||||
.chunks(sharded_key::NUM_OF_INDICES_IN_SHARD)
|
||||
.into_iter()
|
||||
.map(|chunks| chunks.map(|i| *i as usize).collect::<Vec<usize>>())
|
||||
.collect::<Vec<_>>();
|
||||
let last_chunk = chunks.pop();
|
||||
|
||||
chunks.into_iter().try_for_each(|list| {
|
||||
self.put::<tables::AccountHistory>(
|
||||
ShardedKey::new(
|
||||
address,
|
||||
*list.last().expect("Chuck does not return empty list") as TransitionId,
|
||||
),
|
||||
TransitionList::new(list).expect("Indices are presorted and not empty"),
|
||||
)
|
||||
})?;
|
||||
// Insert last list with u64::MAX
|
||||
if let Some(last_list) = last_chunk {
|
||||
self.put::<tables::AccountHistory>(
|
||||
ShardedKey::new(address, u64::MAX),
|
||||
TransitionList::new(last_list).expect("Indices are presorted and not empty"),
|
||||
)?
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Used inside execution stage to commit created account storage changesets for transaction or
|
||||
/// block state change.
|
||||
pub fn insert_execution_result(
|
||||
&self,
|
||||
changesets: Vec<ExecutionResult>,
|
||||
chain_spec: &ChainSpec,
|
||||
parent_block_number: u64,
|
||||
) -> Result<(), TransactionError> {
|
||||
// Get last tx count so that we can know amount of transaction in the block.
|
||||
let mut current_transition_id = self
|
||||
.get::<tables::BlockTransitionIndex>(parent_block_number)?
|
||||
.ok_or(ProviderError::BlockTransition { block_number: parent_block_number })?;
|
||||
|
||||
info!(target: "sync::stages::execution", current_transition_id, blocks = changesets.len(), "Inserting execution results");
|
||||
|
||||
// apply changes to plain database.
|
||||
let mut block_number = parent_block_number;
|
||||
for results in changesets.into_iter() {
|
||||
block_number += 1;
|
||||
let spurious_dragon_active =
|
||||
chain_spec.fork(Hardfork::SpuriousDragon).active_at_block(block_number);
|
||||
// insert state change set
|
||||
for result in results.tx_changesets.into_iter() {
|
||||
for (address, account_change_set) in result.changeset.into_iter() {
|
||||
let AccountChangeSet { account, wipe_storage, storage } = account_change_set;
|
||||
// apply account change to db. Updates AccountChangeSet and PlainAccountState
|
||||
// tables.
|
||||
trace!(target: "sync::stages::execution", ?address, current_transition_id, ?account, wipe_storage, "Applying account changeset");
|
||||
account.apply_to_db(
|
||||
&**self,
|
||||
address,
|
||||
current_transition_id,
|
||||
spurious_dragon_active,
|
||||
)?;
|
||||
|
||||
let storage_id = TransitionIdAddress((current_transition_id, address));
|
||||
|
||||
// cast key to H256 and trace the change
|
||||
let storage = storage
|
||||
.into_iter()
|
||||
.map(|(key, (old_value,new_value))| {
|
||||
let hkey = H256(key.to_be_bytes());
|
||||
trace!(target: "sync::stages::execution", ?address, current_transition_id, ?hkey, ?old_value, ?new_value, "Applying storage changeset");
|
||||
(hkey, old_value,new_value)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let mut cursor_storage_changeset =
|
||||
self.cursor_write::<tables::StorageChangeSet>()?;
|
||||
cursor_storage_changeset.seek_exact(storage_id)?;
|
||||
|
||||
if wipe_storage {
|
||||
// iterate over storage and save them before entry is deleted.
|
||||
self.cursor_read::<tables::PlainStorageState>()?
|
||||
.walk(Some(address))?
|
||||
.take_while(|res| {
|
||||
res.as_ref().map(|(k, _)| *k == address).unwrap_or_default()
|
||||
})
|
||||
.try_for_each(|entry| {
|
||||
let (_, old_value) = entry?;
|
||||
cursor_storage_changeset.append(storage_id, old_value)
|
||||
})?;
|
||||
|
||||
// delete all entries
|
||||
self.delete::<tables::PlainStorageState>(address, None)?;
|
||||
|
||||
// insert storage changeset
|
||||
for (key, _, new_value) in storage {
|
||||
// old values are already cleared.
|
||||
if new_value != U256::ZERO {
|
||||
self.put::<tables::PlainStorageState>(
|
||||
address,
|
||||
StorageEntry { key, value: new_value },
|
||||
)?;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// insert storage changeset
|
||||
for (key, old_value, new_value) in storage {
|
||||
let old_entry = StorageEntry { key, value: old_value };
|
||||
let new_entry = StorageEntry { key, value: new_value };
|
||||
// insert into StorageChangeSet
|
||||
cursor_storage_changeset.append(storage_id, old_entry)?;
|
||||
|
||||
// Always delete old value as duplicate table, put will not override it
|
||||
self.delete::<tables::PlainStorageState>(address, Some(old_entry))?;
|
||||
if new_value != U256::ZERO {
|
||||
self.put::<tables::PlainStorageState>(address, new_entry)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// insert bytecode
|
||||
for (hash, bytecode) in result.new_bytecodes.into_iter() {
|
||||
// make different types of bytecode. Checked and maybe even analyzed (needs to
|
||||
// be packed). Currently save only raw bytes.
|
||||
let bytecode = bytecode.bytes();
|
||||
trace!(target: "sync::stages::execution", ?hash, ?bytecode, len = bytecode.len(), "Inserting bytecode");
|
||||
self.put::<tables::Bytecodes>(hash, bytecode[..bytecode.len()].to_vec())?;
|
||||
|
||||
// NOTE: bytecode bytes are not inserted in change set and can be found in
|
||||
// separate table
|
||||
}
|
||||
current_transition_id += 1;
|
||||
}
|
||||
|
||||
// If there are any post block changes, we will add account changesets to db.
|
||||
for (address, changeset) in results.block_changesets.into_iter() {
|
||||
trace!(target: "sync::stages::execution", ?address, current_transition_id, "Applying block reward");
|
||||
changeset.apply_to_db(
|
||||
&**self,
|
||||
address,
|
||||
current_transition_id,
|
||||
spurious_dragon_active,
|
||||
)?;
|
||||
}
|
||||
current_transition_id += 1;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// An error that can occur when using the transaction container
|
||||
@@ -224,4 +745,19 @@ pub enum TransactionError {
|
||||
/// The transaction encountered a database integrity error.
|
||||
#[error("A database integrity error occurred: {0}")]
|
||||
DatabaseIntegrity(#[from] ProviderError),
|
||||
/// The transaction encountered merkle trie error.
|
||||
#[error("Merkle trie calculation error: {0}")]
|
||||
MerkleTrie(#[from] TrieError),
|
||||
/// Root mismatch
|
||||
#[error("Merkle trie root mismatch on block: #{block_number:?} {block_hash:?}. got: {got:?} expected:{got:?}")]
|
||||
StateTrieRootMismatch {
|
||||
/// Expected root
|
||||
expected: H256,
|
||||
/// Calculated root
|
||||
got: H256,
|
||||
/// Block number
|
||||
block_number: BlockNumber,
|
||||
/// Block hash
|
||||
block_hash: BlockHash,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use crate::Transaction;
|
||||
use cita_trie::{PatriciaTrie, Trie};
|
||||
use hasher::HasherKeccak;
|
||||
use reth_db::{
|
||||
@@ -11,18 +12,19 @@ use reth_primitives::{
|
||||
keccak256, proofs::EMPTY_ROOT, Account, Address, StorageEntry, StorageTrieEntry, TransitionId,
|
||||
H256, KECCAK_EMPTY, U256,
|
||||
};
|
||||
use reth_provider::Transaction;
|
||||
use reth_rlp::{
|
||||
encode_fixed_size, Decodable, DecodeError, Encodable, RlpDecodable, RlpEncodable,
|
||||
EMPTY_STRING_CODE,
|
||||
};
|
||||
use reth_tracing::tracing::*;
|
||||
use std::{
|
||||
collections::{BTreeMap, BTreeSet},
|
||||
ops::Range,
|
||||
sync::Arc,
|
||||
};
|
||||
use tracing::*;
|
||||
|
||||
/// Merkle Trie error types
|
||||
#[allow(missing_docs)]
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum TrieError {
|
||||
#[error("Some error occurred: {0}")]
|
||||
@@ -410,9 +412,8 @@ mod tests {
|
||||
hex_literal::hex,
|
||||
keccak256,
|
||||
proofs::{genesis_state_root, KeccakHasher, EMPTY_ROOT},
|
||||
Address, ChainSpec,
|
||||
Address, ChainSpec, MAINNET,
|
||||
};
|
||||
use reth_staged_sync::utils::chainspec::chain_spec_value_parser;
|
||||
use std::{collections::HashMap, str::FromStr};
|
||||
use triehash::sec_trie_root;
|
||||
|
||||
@@ -553,7 +554,7 @@ mod tests {
|
||||
let trie = DBTrieLoader::default();
|
||||
let db = create_test_rw_db();
|
||||
let mut tx = Transaction::new(db.as_ref()).unwrap();
|
||||
let ChainSpec { genesis, .. } = chain_spec_value_parser("mainnet").unwrap();
|
||||
let ChainSpec { genesis, .. } = MAINNET.clone();
|
||||
|
||||
// Insert account state
|
||||
for (address, account) in &genesis.alloc {
|
||||
@@ -4,34 +4,39 @@ use reth_db::{
|
||||
transaction::{DbTx, DbTxMut},
|
||||
};
|
||||
use reth_interfaces::{provider::ProviderError, Result};
|
||||
use reth_primitives::{SealedBlock, U256};
|
||||
use reth_primitives::{SealedBlock, TransitionId, U256};
|
||||
|
||||
/// Insert block data into corresponding tables. Used mainly for testing & internal tooling.
|
||||
///
|
||||
///
|
||||
/// Check parent dependency in [tables::HeaderNumbers] and in [tables::BlockBodies] tables.
|
||||
/// Inserts blocks data to [tables::CanonicalHeaders], [tables::Headers], [tables::HeaderNumbers],
|
||||
/// and transactions data to [tables::TxSenders], [tables::Transactions],
|
||||
/// [tables::BlockBodies] and [tables::BlockBodies]
|
||||
/// Inserts header data to [tables::CanonicalHeaders], [tables::Headers], [tables::HeaderNumbers].
|
||||
/// and transactions data to [tables::TxSenders], [tables::Transactions], [tables::TxHashNumber].
|
||||
/// and transition indexes to [tables::BlockTransitionIndex] and [tables::TxTransitionIndex].
|
||||
/// And block data [tables::BlockBodies], [tables::BlockBodies] and [tables::BlockWithdrawals].
|
||||
///
|
||||
/// Return [TransitionId] `(from,to)`
|
||||
pub fn insert_block<'a, TX: DbTxMut<'a> + DbTx<'a>>(
|
||||
tx: &TX,
|
||||
block: &SealedBlock,
|
||||
has_block_reward: bool,
|
||||
parent_tx_num_transition_id: Option<(u64, u64)>,
|
||||
) -> Result<()> {
|
||||
) -> Result<(TransitionId, TransitionId)> {
|
||||
tx.put::<tables::CanonicalHeaders>(block.number, block.hash())?;
|
||||
// Put header with canonical hashes.
|
||||
tx.put::<tables::Headers>(block.number, block.header.as_ref().clone())?;
|
||||
tx.put::<tables::HeaderNumbers>(block.hash(), block.number)?;
|
||||
tx.put::<tables::HeaderTD>(
|
||||
block.number,
|
||||
if has_block_reward {
|
||||
U256::ZERO
|
||||
} else {
|
||||
U256::from(58_750_000_000_000_000_000_000_u128) + block.difficulty
|
||||
}
|
||||
.into(),
|
||||
)?;
|
||||
|
||||
// total difficulty
|
||||
let ttd = if block.number == 0 {
|
||||
U256::ZERO
|
||||
} else {
|
||||
let parent_block_number = block.number - 1;
|
||||
let parent_ttd = tx.get::<tables::HeaderTD>(parent_block_number)?.unwrap_or_default();
|
||||
parent_ttd.0 + block.difficulty
|
||||
};
|
||||
|
||||
tx.put::<tables::HeaderTD>(block.number, ttd.into())?;
|
||||
|
||||
// insert body ommers data
|
||||
if !block.ommers.is_empty() {
|
||||
@@ -56,7 +61,7 @@ pub fn insert_block<'a, TX: DbTxMut<'a> + DbTx<'a>>(
|
||||
.ok_or(ProviderError::BlockTransition { block_number: prev_block_num })?;
|
||||
(prev_body.start_tx_id + prev_body.tx_count, last_transition_id)
|
||||
};
|
||||
|
||||
let from_transition = transition_id;
|
||||
// insert body data
|
||||
tx.put::<tables::BlockBodies>(
|
||||
block.number,
|
||||
@@ -65,9 +70,11 @@ pub fn insert_block<'a, TX: DbTxMut<'a> + DbTx<'a>>(
|
||||
|
||||
for transaction in block.body.iter() {
|
||||
let rec_tx = transaction.clone().into_ecrecovered().unwrap();
|
||||
let hash = rec_tx.hash();
|
||||
tx.put::<tables::TxSenders>(current_tx_id, rec_tx.signer())?;
|
||||
tx.put::<tables::Transactions>(current_tx_id, rec_tx.into())?;
|
||||
tx.put::<tables::TxTransitionIndex>(current_tx_id, transition_id)?;
|
||||
tx.put::<tables::TxHashNumber>(hash, current_tx_id)?;
|
||||
transition_id += 1;
|
||||
current_tx_id += 1;
|
||||
}
|
||||
@@ -88,7 +95,8 @@ pub fn insert_block<'a, TX: DbTxMut<'a> + DbTx<'a>>(
|
||||
}
|
||||
tx.put::<tables::BlockTransitionIndex>(block.number, transition_id)?;
|
||||
|
||||
Ok(())
|
||||
let to_transition = transition_id;
|
||||
Ok((from_transition, to_transition))
|
||||
}
|
||||
|
||||
/// Inserts canonical block in blockchain. Parent tx num and transition id is taken from
|
||||
@@ -97,6 +105,6 @@ pub fn insert_canonical_block<'a, TX: DbTxMut<'a> + DbTx<'a>>(
|
||||
tx: &TX,
|
||||
block: &SealedBlock,
|
||||
has_block_reward: bool,
|
||||
) -> Result<()> {
|
||||
) -> Result<(TransitionId, TransitionId)> {
|
||||
insert_block(tx, block, has_block_reward, None)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user