diff --git a/Cargo.lock b/Cargo.lock index 8668d9c0ad..f42fba15ca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4858,6 +4858,7 @@ dependencies = [ "crunchy", "derive_more", "ethers-core", + "eyre", "fixed-hash", "hash-db", "hex", @@ -4879,6 +4880,7 @@ dependencies = [ "serde", "serde_json", "serde_with", + "shellexpand", "strum", "sucds", "test-fuzz", @@ -4892,14 +4894,22 @@ dependencies = [ name = "reth-provider" version = "0.1.0" dependencies = [ + "assert_matches", "auto_impl", + "cita_trie", + "hasher", + "itertools 0.10.5", "parking_lot 0.12.1", + "proptest", "reth-db", "reth-interfaces", "reth-primitives", "reth-revm-primitives", + "reth-rlp", + "reth-tracing", "revm-primitives", "thiserror", + "triehash", ] [[package]] @@ -5116,11 +5126,9 @@ dependencies = [ "arbitrary", "assert_matches", "async-trait", - "cita_trie", "criterion", "eyre", "futures-util", - "hasher", "itertools 0.10.5", "metrics", "num-traits", diff --git a/crates/executor/src/lib.rs b/crates/executor/src/lib.rs index 523fc7a0f0..6d28d96482 100644 --- a/crates/executor/src/lib.rs +++ b/crates/executor/src/lib.rs @@ -9,7 +9,7 @@ pub mod eth_dao_fork; -/// Execution result types -pub mod execution_result; +/// Execution result types. +pub use reth_provider::execution_result; /// Executor pub mod executor; diff --git a/crates/primitives/Cargo.toml b/crates/primitives/Cargo.toml index df03f8b5e3..91bd5718fe 100644 --- a/crates/primitives/Cargo.toml +++ b/crates/primitives/Cargo.toml @@ -9,15 +9,11 @@ description = "Commonly used types in reth." [dependencies] # reth -reth-rlp = { path = "../rlp", features = [ - "std", - "derive", - "ethereum-types", -] } +reth-rlp = { path = "../rlp", features = ["std", "derive", "ethereum-types"] } reth-rlp-derive = { path = "../rlp/rlp-derive" } reth-codecs = { version = "0.1.0", path = "../storage/codecs" } -revm-primitives = { version="1.0.0", features = ["serde"] } +revm-primitives = { version = "1.0.0", features = ["serde"] } # ethereum ethers-core = { git = "https://github.com/gakonst/ethers-rs", default-features = false } @@ -61,6 +57,10 @@ triehash = "0.8" plain_hasher = "0.2" hash-db = "0.15" +# used for clap value parser +eyre = "0.6" +shellexpand = "3.0" + # arbitrary utils arbitrary = { version = "1.1.7", features = ["derive"], optional = true } proptest = { version = "1.0", optional = true } @@ -81,7 +81,11 @@ proptest-derive = "0.3" # https://github.com/paradigmxyz/reth/pull/177#discussion_r1021172198 secp256k1 = "0.24.2" criterion = "0.4.0" -pprof = { version = "0.11", features = ["flamegraph", "frame-pointer", "criterion"] } +pprof = { version = "0.11", features = [ + "flamegraph", + "frame-pointer", + "criterion", +] } [features] default = [] diff --git a/crates/primitives/src/chain/spec.rs b/crates/primitives/src/chain/spec.rs index 06a91fc4e0..98ca08338e 100644 --- a/crates/primitives/src/chain/spec.rs +++ b/crates/primitives/src/chain/spec.rs @@ -585,10 +585,9 @@ impl ForkCondition { mod tests { use crate::{ AllGenesisFormats, Chain, ChainSpec, ChainSpecBuilder, ForkCondition, ForkHash, ForkId, - Genesis, Hardfork, Head, GOERLI, H256, MAINNET, SEPOLIA, + Genesis, Hardfork, Head, GOERLI, H256, MAINNET, SEPOLIA, U256, }; use ethers_core::types as EtherType; - use revm_primitives::U256; fn test_fork_ids(spec: &ChainSpec, cases: &[(Head, ForkId)]) { for (block, expected_id) in cases { let computed_id = spec.fork_id(block); diff --git a/crates/stages/Cargo.toml b/crates/stages/Cargo.toml index eb14a8e7c1..3a50de8a2a 100644 --- a/crates/stages/Cargo.toml +++ b/crates/stages/Cargo.toml @@ -43,10 +43,6 @@ itertools = "0.10.5" rayon = "1.6.0" num-traits = "0.2.15" -# trie -cita_trie = "4.0.0" -hasher = "0.1.4" - # arbitrary utils arbitrary = { version = "1.1.7", features = ["derive"], optional = true } proptest = { version = "1.0", optional = true } diff --git a/crates/stages/benches/setup/mod.rs b/crates/stages/benches/setup/mod.rs index 34876d031a..aaed854bd8 100644 --- a/crates/stages/benches/setup/mod.rs +++ b/crates/stages/benches/setup/mod.rs @@ -10,10 +10,11 @@ use reth_interfaces::test_utils::generators::{ random_transition_range, }; use reth_primitives::{Account, Address, SealedBlock, H256}; +use reth_provider::trie::DBTrieLoader; use reth_stages::{ stages::{AccountHashingStage, StorageHashingStage}, test_utils::TestTransaction, - DBTrieLoader, ExecInput, Stage, UnwindInput, + ExecInput, Stage, UnwindInput, }; use std::{ collections::BTreeMap, diff --git a/crates/stages/src/lib.rs b/crates/stages/src/lib.rs index 21dd5b57c7..8e5e75ec4b 100644 --- a/crates/stages/src/lib.rs +++ b/crates/stages/src/lib.rs @@ -53,7 +53,6 @@ mod error; mod id; mod pipeline; mod stage; -mod trie; mod util; /// The real database type we use in Reth using MDBX. @@ -77,7 +76,6 @@ pub use error::*; pub use id::*; pub use pipeline::*; pub use stage::*; -pub use trie::DBTrieLoader; // NOTE: Needed so the link in the module-level rustdoc works. #[allow(unused_extern_crates)] diff --git a/crates/stages/src/stages/execution.rs b/crates/stages/src/stages/execution.rs index f7b58eb9b8..633bfec4ad 100644 --- a/crates/stages/src/stages/execution.rs +++ b/crates/stages/src/stages/execution.rs @@ -9,9 +9,9 @@ use reth_db::{ tables, transaction::{DbTx, DbTxMut}, }; -use reth_executor::{execution_result::AccountChangeSet, executor::Executor}; +use reth_executor::executor::Executor; use reth_interfaces::provider::ProviderError; -use reth_primitives::{Address, Block, ChainSpec, Hardfork, StorageEntry, H256, U256}; +use reth_primitives::{Address, Block, ChainSpec, U256}; use reth_provider::{LatestStateProviderRef, StateProvider, Transaction}; use reth_revm::database::{State, SubState}; use tracing::*; @@ -119,7 +119,7 @@ impl<'a, S: StateProvider> ExecutionStage<'a, S> { let mut state_provider = SubState::new(State::new(LatestStateProviderRef::new(&**tx))); // Fetch transactions, execute them and generate results - let mut block_change_patches = Vec::with_capacity(block_batch.len()); + let mut changesets = Vec::with_capacity(block_batch.len()); for (header, td, body, ommers, withdrawals) in block_batch.into_iter() { let block_number = header.number; tracing::trace!(target: "sync::stages::execution", ?block_number, "Execute block."); @@ -162,117 +162,11 @@ impl<'a, S: StateProvider> ExecutionStage<'a, S> { Some(signers), ) .map_err(|error| StageError::ExecutionError { block: block_number, error })?; - block_change_patches.push((changeset, block_number)); + changesets.push(changeset); } - // Get last tx count so that we can know amount of transaction in the block. - let mut current_transition_id = tx.get_block_transition(last_block)?; - info!(target: "sync::stages::execution", current_transition_id, blocks = block_change_patches.len(), "Inserting execution results"); - - // apply changes to plain database. - for (results, block_number) in block_change_patches.into_iter() { - let spurious_dragon_active = self - .executor - .chain_spec - .fork(Hardfork::SpuriousDragon) - .active_at_block(block_number); - // insert state change set - for result in results.tx_changesets.into_iter() { - for (address, account_change_set) in result.changeset.into_iter() { - let AccountChangeSet { account, wipe_storage, storage } = account_change_set; - // apply account change to db. Updates AccountChangeSet and PlainAccountState - // tables. - trace!(target: "sync::stages::execution", ?address, current_transition_id, ?account, wipe_storage, "Applying account changeset"); - account.apply_to_db( - &**tx, - address, - current_transition_id, - spurious_dragon_active, - )?; - - let storage_id = TransitionIdAddress((current_transition_id, address)); - - // cast key to H256 and trace the change - let storage = storage - .into_iter() - .map(|(key, (old_value,new_value))| { - let hkey = H256(key.to_be_bytes()); - trace!(target: "sync::stages::execution", ?address, current_transition_id, ?hkey, ?old_value, ?new_value, "Applying storage changeset"); - (hkey, old_value,new_value) - }) - .collect::>(); - - let mut cursor_storage_changeset = - tx.cursor_write::()?; - cursor_storage_changeset.seek_exact(storage_id)?; - - if wipe_storage { - // iterate over storage and save them before entry is deleted. - tx.cursor_read::()? - .walk(Some(address))? - .take_while(|res| { - res.as_ref().map(|(k, _)| *k == address).unwrap_or_default() - }) - .try_for_each(|entry| { - let (_, old_value) = entry?; - cursor_storage_changeset.append(storage_id, old_value) - })?; - - // delete all entries - tx.delete::(address, None)?; - - // insert storage changeset - for (key, _, new_value) in storage { - // old values are already cleared. - if new_value != U256::ZERO { - tx.put::( - address, - StorageEntry { key, value: new_value }, - )?; - } - } - } else { - // insert storage changeset - for (key, old_value, new_value) in storage { - let old_entry = StorageEntry { key, value: old_value }; - let new_entry = StorageEntry { key, value: new_value }; - // insert into StorageChangeSet - cursor_storage_changeset.append(storage_id, old_entry)?; - - // Always delete old value as duplicate table, put will not override it - tx.delete::(address, Some(old_entry))?; - if new_value != U256::ZERO { - tx.put::(address, new_entry)?; - } - } - } - } - // insert bytecode - for (hash, bytecode) in result.new_bytecodes.into_iter() { - // make different types of bytecode. Checked and maybe even analyzed (needs to - // be packed). Currently save only raw bytes. - let bytecode = bytecode.bytes(); - trace!(target: "sync::stages::execution", ?hash, ?bytecode, len = bytecode.len(), "Inserting bytecode"); - tx.put::(hash, bytecode[..bytecode.len()].to_vec())?; - - // NOTE: bytecode bytes are not inserted in change set and can be found in - // separate table - } - current_transition_id += 1; - } - - // If there are any post block changes, we will add account changesets to db. - for (address, changeset) in results.block_changesets.into_iter() { - trace!(target: "sync::stages::execution", ?address, current_transition_id, "Applying block reward"); - changeset.apply_to_db( - &**tx, - address, - current_transition_id, - spurious_dragon_active, - )?; - } - current_transition_id += 1; - } + // put execution results to database + tx.insert_execution_result(changesets, &self.executor.chain_spec, last_block)?; let done = !capped; info!(target: "sync::stages::execution", stage_progress = end_block, done, "Sync iteration finished"); @@ -413,7 +307,8 @@ mod tests { models::AccountBeforeTx, }; use reth_primitives::{ - hex_literal::hex, keccak256, Account, ChainSpecBuilder, SealedBlock, H160, MAINNET, U256, + hex_literal::hex, keccak256, Account, ChainSpecBuilder, SealedBlock, StorageEntry, H160, + H256, MAINNET, U256, }; use reth_provider::insert_canonical_block; use reth_rlp::Decodable; diff --git a/crates/stages/src/stages/hashing_account.rs b/crates/stages/src/stages/hashing_account.rs index ffba350c87..ceaedf30c0 100644 --- a/crates/stages/src/stages/hashing_account.rs +++ b/crates/stages/src/stages/hashing_account.rs @@ -7,11 +7,7 @@ use reth_db::{ }; use reth_primitives::{keccak256, Account, Address}; use reth_provider::Transaction; -use std::{ - collections::{BTreeMap, BTreeSet}, - fmt::Debug, - ops::Range, -}; +use std::{collections::BTreeMap, fmt::Debug, ops::Range}; use tracing::*; /// The [`StageId`] of the account hashing stage. @@ -180,38 +176,15 @@ impl Stage for AccountHashingStage { break } } else { - let mut plain_accounts = tx.cursor_read::()?; - let mut hashed_accounts = tx.cursor_write::()?; - // Aggregate all transition changesets and and make list of account that have been // changed. - tx.cursor_read::()? - .walk_range(from_transition..to_transition)? - .collect::, _>>()? - .into_iter() - // fold all account to one set of changed accounts - .fold(BTreeSet::new(), |mut accounts: BTreeSet
, (_, account_before)| { - accounts.insert(account_before.address); - accounts - }) - .into_iter() - // iterate over plain state and get newest value. - // Assumption we are okay to make is that plainstate represent - // `previous_stage_progress` state. - .map(|address| { - plain_accounts.seek_exact(address).map(|a| (address, a.map(|(_, v)| v))) - }) - .collect::, _>>()? - .into_iter() - .try_for_each(|(address, account)| -> Result<(), StageError> { - let hashed_address = keccak256(address); - if let Some(account) = account { - hashed_accounts.upsert(hashed_address, account)? - } else if hashed_accounts.seek_exact(hashed_address)?.is_some() { - hashed_accounts.delete_current()?; - } - Ok(()) - })?; + let lists = tx.get_addresses_of_changed_accounts(from_transition, to_transition)?; + // iterate over plain state and get newest value. + // Assumption we are okay to make is that plainstate represent + // `previous_stage_progress` state. + let accounts = tx.get_plainstate_accounts(lists.into_iter())?; + // insert and hash accounts to hashing table + tx.insert_account_for_hashing(accounts.into_iter())?; } info!(target: "sync::stages::hashing_account", "Stage finished"); diff --git a/crates/stages/src/stages/hashing_storage.rs b/crates/stages/src/stages/hashing_storage.rs index e7b5d3c49a..7e54b37fcd 100644 --- a/crates/stages/src/stages/hashing_storage.rs +++ b/crates/stages/src/stages/hashing_storage.rs @@ -9,10 +9,7 @@ use reth_db::{ }; use reth_primitives::{keccak256, Address, StorageEntry, H256, U256}; use reth_provider::Transaction; -use std::{ - collections::{BTreeMap, BTreeSet}, - fmt::Debug, -}; +use std::{collections::BTreeMap, fmt::Debug}; use tracing::*; /// The [`StageId`] of the storage hashing stage. @@ -133,70 +130,15 @@ impl Stage for StorageHashingStage { } } } else { - let mut plain_storage = tx.cursor_dup_read::()?; - let mut hashed_storage = tx.cursor_dup_write::()?; - // Aggregate all transition changesets and and make list of storages that have been // changed. - tx.cursor_read::()? - .walk_range( - TransitionIdAddress((from_transition, Address::zero())).. - TransitionIdAddress((to_transition, Address::zero())), - )? - .collect::, _>>()? - .into_iter() - // fold all storages and save its old state so we can remove it from HashedStorage - // it is needed as it is dup table. - .fold( - BTreeMap::new(), - |mut accounts: BTreeMap>, - (TransitionIdAddress((_, address)), storage_entry)| { - accounts.entry(address).or_default().insert(storage_entry.key); - accounts - }, - ) - .into_iter() - // iterate over plain state and get newest storage value. - // Assumption we are okay with is that plain state represent - // `previous_stage_progress` state. - .map(|(address, storage)| { - let res = ( - keccak256(address), - storage - .into_iter() - .map(|key| { - Ok::, reth_db::Error>( - plain_storage - .seek_by_key_subkey(address, key)? - .filter(|v| v.key == key) - .map(|ret| (keccak256(key), ret.value)), - ) - }) - .collect::>, _>>()? - .into_iter() - .flatten() - .collect::>(), - ); - Ok::<_, reth_db::Error>(res) - }) - .collect::, _>>()? - .into_iter() - // Hash the address and key and apply them to HashedStorage (if Storage is None - // just remove it); - .try_for_each(|(hashed_address, storage)| { - storage.into_iter().try_for_each(|(key, val)| -> Result<(), StageError> { - if hashed_storage - .seek_by_key_subkey(hashed_address, key)? - .filter(|entry| entry.key == key) - .is_some() - { - hashed_storage.delete_current()?; - } - - hashed_storage.upsert(hashed_address, StorageEntry { key, value: val })?; - Ok(()) - }) - })?; + let lists = + tx.get_addresses_and_keys_of_changed_storages(from_transition, to_transition)?; + // iterate over plain state and get newest storage value. + // Assumption we are okay with is that plain state represent + // `previous_stage_progress` state. + let storages = tx.get_plainstate_storages(lists.into_iter())?; + tx.insert_storage_for_hashing(storages.into_iter())?; } info!(target: "sync::stages::hashing_storage", "Stage finished"); diff --git a/crates/stages/src/stages/index_account_history.rs b/crates/stages/src/stages/index_account_history.rs index ccfda386f4..7cfa4fdcd9 100644 --- a/crates/stages/src/stages/index_account_history.rs +++ b/crates/stages/src/stages/index_account_history.rs @@ -1,9 +1,8 @@ use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput}; -use itertools::Itertools; use reth_db::{ cursor::{DbCursorRO, DbCursorRW}, database::{Database, DatabaseGAT}, - models::{sharded_key::NUM_OF_INDICES_IN_SHARD, ShardedKey}, + models::ShardedKey, tables, transaction::{DbTx, DbTxMut, DbTxMutGAT}, TransitionList, @@ -57,52 +56,10 @@ impl Stage for IndexAccountHistoryStage { std::cmp::min(stage_progress + self.commit_threshold, previous_stage_progress); let to_transition = tx.get_block_transition(to_block)?; - let account_changesets = tx - .cursor_read::()? - .walk(Some(from_transition))? - .take_while(|res| res.as_ref().map(|(k, _)| *k < to_transition).unwrap_or_default()) - .collect::, _>>()?; - - let account_changeset_lists = account_changesets - .into_iter() - // fold all account to one set of changed accounts - .fold( - BTreeMap::new(), - |mut accounts: BTreeMap>, (index, account)| { - accounts.entry(account.address).or_default().push(index); - accounts - }, - ); - // insert indexes to AccontHistory. - for (address, mut indices) in account_changeset_lists { - let mut last_shard = take_last_account_shard(tx, address)?; - last_shard.append(&mut indices); - // chunk indices and insert them in shards of N size. - let mut chunks = last_shard - .iter() - .chunks(NUM_OF_INDICES_IN_SHARD) - .into_iter() - .map(|chunks| chunks.map(|i| *i as usize).collect::>()) - .collect::>(); - let last_chunk = chunks.pop(); - - chunks.into_iter().try_for_each(|list| { - tx.put::( - ShardedKey::new( - address, - *list.last().expect("Chuck does not return empty list") as TransitionId, - ), - TransitionList::new(list).expect("Indices are presorted and not empty"), - ) - })?; - // Insert last list with u64::MAX - if let Some(last_list) = last_chunk { - tx.put::( - ShardedKey::new(address, u64::MAX), - TransitionList::new(last_list).expect("Indices are presorted and not empty"), - )? - } - } + let indices = + tx.get_account_transition_ids_from_changeset(from_transition, to_transition)?; + // Insert changeset to history index + tx.insert_account_history_index(indices)?; info!(target: "sync::stages::index_account_history", "Stage finished"); Ok(ExecOutput { stage_progress: to_block, done: true }) @@ -155,23 +112,6 @@ impl Stage for IndexAccountHistoryStage { } } -/// Load last shard and check if it is full and remove if it is not. If list is empty, last shard -/// was full or there is no shards at all. -pub fn take_last_account_shard( - tx: &Transaction<'_, DB>, - address: Address, -) -> Result, StageError> { - let mut cursor = tx.cursor_read::()?; - let last = cursor.seek_exact(ShardedKey::new(address, u64::MAX))?; - if let Some((shard_key, list)) = last { - // delete old shard so new one can be inserted. - tx.delete::(shard_key, None)?; - let list = list.iter(0).map(|i| i as u64).collect::>(); - return Ok(list) - } - Ok(Vec::new()) -} - /// Unwind all history shards. For boundary shard, remove it from database and /// return last part of shard with still valid items. If all full shard were removed, return list /// would be empty. @@ -212,7 +152,7 @@ pub fn unwind_account_history_shards( mod tests { use super::*; use crate::test_utils::{TestTransaction, PREV_STAGE_ID}; - use reth_db::models::AccountBeforeTx; + use reth_db::models::{sharded_key::NUM_OF_INDICES_IN_SHARD, AccountBeforeTx}; use reth_primitives::{hex_literal::hex, H160}; const ADDRESS: H160 = H160(hex!("0000000000000000000000000000000000000001")); diff --git a/crates/stages/src/stages/index_storage_history.rs b/crates/stages/src/stages/index_storage_history.rs index efef9e065f..e29091030e 100644 --- a/crates/stages/src/stages/index_storage_history.rs +++ b/crates/stages/src/stages/index_storage_history.rs @@ -1,9 +1,8 @@ use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput}; -use itertools::Itertools; use reth_db::{ cursor::{DbCursorRO, DbCursorRW}, database::{Database, DatabaseGAT}, - models::{sharded_key::NUM_OF_INDICES_IN_SHARD, storage_sharded_key::StorageShardedKey}, + models::storage_sharded_key::StorageShardedKey, tables, transaction::{DbTx, DbTxMut, DbTxMutGAT}, TransitionList, @@ -57,58 +56,9 @@ impl Stage for IndexStorageHistoryStage { std::cmp::min(stage_progress + self.commit_threshold, previous_stage_progress); let to_transition = tx.get_block_transition(to_block)?; - let storage_chageset = tx - .cursor_read::()? - .walk(Some((from_transition, Address::zero()).into()))? - .take_while(|res| { - res.as_ref().map(|(k, _)| k.transition_id() < to_transition).unwrap_or_default() - }) - .collect::, _>>()?; - - // fold all storages to one set of changes - let storage_changeset_lists = storage_chageset.into_iter().fold( - BTreeMap::new(), - |mut storages: BTreeMap<(Address, H256), Vec>, (index, storage)| { - storages - .entry((index.address(), storage.key)) - .or_default() - .push(index.transition_id()); - storages - }, - ); - - for ((address, storage_key), mut indices) in storage_changeset_lists { - let mut last_shard = take_last_storage_shard(tx, address, storage_key)?; - last_shard.append(&mut indices); - - // chunk indices and insert them in shards of N size. - let mut chunks = last_shard - .iter() - .chunks(NUM_OF_INDICES_IN_SHARD) - .into_iter() - .map(|chunks| chunks.map(|i| *i as usize).collect::>()) - .collect::>(); - let last_chunk = chunks.pop(); - - // chunk indices and insert them in shards of N size. - chunks.into_iter().try_for_each(|list| { - tx.put::( - StorageShardedKey::new( - address, - storage_key, - *list.last().expect("Chuck does not return empty list") as TransitionId, - ), - TransitionList::new(list).expect("Indices are presorted and not empty"), - ) - })?; - // Insert last list with u64::MAX - if let Some(last_list) = last_chunk { - tx.put::( - StorageShardedKey::new(address, storage_key, u64::MAX), - TransitionList::new(last_list).expect("Indices are presorted and not empty"), - )?; - } - } + let indices = + tx.get_storage_transition_ids_from_changeset(from_transition, to_transition)?; + tx.insert_storage_history_index(indices)?; info!(target: "sync::stages::index_storage_history", "Stage finished"); Ok(ExecOutput { stage_progress: to_block, done: true }) @@ -164,24 +114,6 @@ impl Stage for IndexStorageHistoryStage { } } -/// Load last shard and check if it is full and remove if it is not. If list is empty, last shard -/// was full or there is no shards at all. -pub fn take_last_storage_shard( - tx: &Transaction<'_, DB>, - address: Address, - storage_key: H256, -) -> Result, StageError> { - let mut cursor = tx.cursor_read::()?; - let last = cursor.seek_exact(StorageShardedKey::new(address, storage_key, u64::MAX))?; - if let Some((storage_shard_key, list)) = last { - // delete old shard so new one can be inserted. - tx.delete::(storage_shard_key, None)?; - let list = list.iter(0).map(|i| i as u64).collect::>(); - return Ok(list) - } - Ok(Vec::new()) -} - /// Unwind all history shards. For boundary shard, remove it from database and /// return last part of shard with still valid items. If all full shard were removed, return list /// would be empty but this does not mean that there is none shard left but that there is no @@ -226,7 +158,9 @@ mod tests { use super::*; use crate::test_utils::{TestTransaction, PREV_STAGE_ID}; - use reth_db::models::{ShardedKey, TransitionIdAddress}; + use reth_db::models::{ + storage_sharded_key::NUM_OF_INDICES_IN_SHARD, ShardedKey, TransitionIdAddress, + }; use reth_primitives::{hex_literal::hex, StorageEntry, H160, U256}; const ADDRESS: H160 = H160(hex!("0000000000000000000000000000000000000001")); diff --git a/crates/stages/src/stages/merkle.rs b/crates/stages/src/stages/merkle.rs index 884c137a86..ee29dd74d9 100644 --- a/crates/stages/src/stages/merkle.rs +++ b/crates/stages/src/stages/merkle.rs @@ -1,10 +1,7 @@ -use crate::{ - trie::DBTrieLoader, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, - UnwindOutput, -}; +use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput}; use reth_db::{database::Database, tables, transaction::DbTx}; use reth_interfaces::consensus; -use reth_provider::Transaction; +use reth_provider::{trie::DBTrieLoader, Transaction}; use std::fmt::Debug; use tracing::*; diff --git a/crates/storage/provider/Cargo.toml b/crates/storage/provider/Cargo.toml index 509009cbaf..0a1802d14e 100644 --- a/crates/storage/provider/Cargo.toml +++ b/crates/storage/provider/Cargo.toml @@ -13,18 +13,31 @@ reth-primitives = { path = "../../primitives" } reth-interfaces = { path = "../../interfaces" } reth-revm-primitives = { path = "../../revm/revm-primitives" } reth-db = { path = "../db" } +reth-tracing = {path = "../../tracing"} +reth-rlp = {path = "../../rlp"} + +revm-primitives = "1.0.0" + +# trie +cita_trie = "4.0.0" +hasher = "0.1.4" # misc thiserror = "1.0.37" auto_impl = "1.0" +itertools = "0.10" # feature test-utils parking_lot = { version = "0.12", optional = true } -revm-primitives = "1.0" [dev-dependencies] reth-db = { path = "../db", features = ["test-utils"] } parking_lot = "0.12" +proptest = { version = "1.0" } +assert_matches = "1.5" + +# trie +triehash = "0.8" [features] bench = [] diff --git a/crates/executor/src/execution_result.rs b/crates/storage/provider/src/execution_result.rs similarity index 99% rename from crates/executor/src/execution_result.rs rename to crates/storage/provider/src/execution_result.rs index 3f1b433fac..747f08eabc 100644 --- a/crates/executor/src/execution_result.rs +++ b/crates/storage/provider/src/execution_result.rs @@ -1,6 +1,8 @@ +//! Output of execution. + use reth_db::{models::AccountBeforeTx, tables, transaction::DbTxMut, Error as DbError}; use reth_primitives::{Account, Address, Receipt, H256, U256}; -use revm::primitives::Bytecode; +use revm_primitives::Bytecode; use std::collections::BTreeMap; /// Execution Result containing vector of transaction changesets diff --git a/crates/storage/provider/src/lib.rs b/crates/storage/provider/src/lib.rs index d7941257ae..c9f2496dc3 100644 --- a/crates/storage/provider/src/lib.rs +++ b/crates/storage/provider/src/lib.rs @@ -22,6 +22,12 @@ pub use providers::{ LatestStateProviderRef, ShareableDatabase, }; +/// Merkle trie +pub mod trie; + +/// Execution result +pub mod execution_result; + /// Helper types for interacting with the database mod transaction; pub use transaction::{Transaction, TransactionError}; diff --git a/crates/storage/provider/src/transaction.rs b/crates/storage/provider/src/transaction.rs index b1c1898fdc..5da17221cf 100644 --- a/crates/storage/provider/src/transaction.rs +++ b/crates/storage/provider/src/transaction.rs @@ -1,19 +1,36 @@ -#![allow(dead_code)] +use itertools::Itertools; use reth_db::{ - cursor::DbCursorRO, + cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO}, database::{Database, DatabaseGAT}, - models::StoredBlockBody, + models::{ + sharded_key, + storage_sharded_key::{self, StorageShardedKey}, + ShardedKey, StoredBlockBody, TransitionIdAddress, + }, table::Table, tables, transaction::{DbTx, DbTxMut}, + TransitionList, }; use reth_interfaces::{db::Error as DbError, provider::ProviderError}; -use reth_primitives::{BlockHash, BlockNumber, Header, TransitionId, TxNumber, U256}; +use reth_primitives::{ + keccak256, Account, Address, BlockHash, BlockNumber, ChainSpec, Hardfork, Header, SealedBlock, + StorageEntry, TransitionId, TxNumber, H256, U256, +}; +use reth_tracing::tracing::{info, trace}; use std::{ + collections::{BTreeMap, BTreeSet}, fmt::Debug, ops::{Deref, DerefMut}, }; +use crate::{ + insert_canonical_block, + trie::{DBTrieLoader, TrieError}, +}; + +use crate::execution_result::{AccountChangeSet, ExecutionResult}; + /// A container for any DB transaction that will open a new inner transaction when the current /// one is committed. // NOTE: This container is needed since `Transaction::commit` takes `mut self`, so methods in @@ -71,6 +88,11 @@ where Ok(Self { db, tx: Some(db.tx_mut()?) }) } + /// Creates a new container with given database and transaction handles. + pub fn new_raw(db: &'this DB, tx: >::TXMut) -> Self { + Self { db, tx: Some(tx) } + } + /// Accessor to the internal Database pub fn inner(&self) -> &'this DB { self.db @@ -213,6 +235,505 @@ where } Ok(()) } + + /// Load last shard and check if it is full and remove if it is not. If list is empty, last + /// shard was full or there is no shards at all. + fn take_last_account_shard(&self, address: Address) -> Result, TransactionError> { + let mut cursor = self.cursor_read::()?; + let last = cursor.seek_exact(ShardedKey::new(address, u64::MAX))?; + if let Some((shard_key, list)) = last { + // delete old shard so new one can be inserted. + self.delete::(shard_key, None)?; + let list = list.iter(0).map(|i| i as u64).collect::>(); + return Ok(list) + } + Ok(Vec::new()) + } + + /// Load last shard and check if it is full and remove if it is not. If list is empty, last + /// shard was full or there is no shards at all. + pub fn take_last_storage_shard( + &self, + address: Address, + storage_key: H256, + ) -> Result, TransactionError> { + let mut cursor = self.cursor_read::()?; + let last = cursor.seek_exact(StorageShardedKey::new(address, storage_key, u64::MAX))?; + if let Some((storage_shard_key, list)) = last { + // delete old shard so new one can be inserted. + self.delete::(storage_shard_key, None)?; + let list = list.iter(0).map(|i| i as u64).collect::>(); + return Ok(list) + } + Ok(Vec::new()) + } +} + +/// Stages impl +impl<'this, DB> Transaction<'this, DB> +where + DB: Database, +{ + /// Insert full block and make it canonical + /// + /// This is atomic operation and transaction will do one commit at the end of the function. + pub fn insert_block( + &mut self, + block: &SealedBlock, + chain_spec: &ChainSpec, + changeset: ExecutionResult, + ) -> Result<(), TransactionError> { + // Header, Body, SenderRecovery, TD, TxLookup stages + let (from, to) = insert_canonical_block(self.deref_mut(), block, false).unwrap(); + + let parent_block_number = block.number - 1; + + // execution stage + self.insert_execution_result(vec![changeset], chain_spec, parent_block_number)?; + + // storage hashing stage + { + let lists = self.get_addresses_and_keys_of_changed_storages(from, to)?; + let storages = self.get_plainstate_storages(lists.into_iter())?; + self.insert_storage_for_hashing(storages.into_iter())?; + } + + // account hashing stage + { + let lists = self.get_addresses_of_changed_accounts(from, to)?; + let accounts = self.get_plainstate_accounts(lists.into_iter())?; + self.insert_account_for_hashing(accounts.into_iter())?; + } + + // merkle tree + { + let current_root = self.get_header(parent_block_number)?.state_root; + let loader = DBTrieLoader::default(); + let root = loader.update_root(self, current_root, from..to)?; + if root != block.state_root { + return Err(TransactionError::StateTrieRootMismatch { + got: root, + expected: block.state_root, + block_number: block.number, + block_hash: block.hash(), + }) + } + } + + // account history stage + { + let indices = self.get_account_transition_ids_from_changeset(from, to)?; + self.insert_account_history_index(indices)?; + } + + // storage history stage + { + let indices = self.get_storage_transition_ids_from_changeset(from, to)?; + self.insert_storage_history_index(indices)?; + } + + // commit block to database + self.commit()?; + Ok(()) + } + + /// Iterate over account changesets and return all account address that were changed. + pub fn get_addresses_and_keys_of_changed_storages( + &self, + from: TransitionId, + to: TransitionId, + ) -> Result>, TransactionError> { + Ok(self + .cursor_read::()? + .walk_range( + TransitionIdAddress((from, Address::zero())).. + TransitionIdAddress((to, Address::zero())), + )? + .collect::, _>>()? + .into_iter() + // fold all storages and save its old state so we can remove it from HashedStorage + // it is needed as it is dup table. + .fold( + BTreeMap::new(), + |mut accounts: BTreeMap>, + (TransitionIdAddress((_, address)), storage_entry)| { + accounts.entry(address).or_default().insert(storage_entry.key); + accounts + }, + )) + } + + /// Get plainstate storages + #[allow(clippy::type_complexity)] + pub fn get_plainstate_storages( + &self, + iter: impl IntoIterator)>, + ) -> Result)>, TransactionError> { + let mut plain_storage = self.cursor_dup_read::()?; + + iter.into_iter() + .map(|(address, storage)| { + storage + .into_iter() + .map(|key| -> Result<_, TransactionError> { + let ret = plain_storage + .seek_by_key_subkey(address, key)? + .filter(|v| v.key == key) + .unwrap_or_default(); + Ok((key, ret.value)) + }) + .collect::, _>>() + .map(|storage| (address, storage)) + }) + .collect::, _>>() + } + + /// iterate over storages and insert them to hashing table + pub fn insert_storage_for_hashing( + &self, + storages: impl IntoIterator)>, + ) -> Result<(), TransactionError> { + // hash values + let hashed = storages.into_iter().fold(BTreeMap::new(), |mut map, (address, storage)| { + let storage = storage.into_iter().fold(BTreeMap::new(), |mut map, (key, value)| { + map.insert(keccak256(key), value); + map + }); + map.insert(keccak256(address), storage); + map + }); + + let mut hashed_storage = self.cursor_dup_write::()?; + // Hash the address and key and apply them to HashedStorage (if Storage is None + // just remove it); + hashed.into_iter().try_for_each(|(hashed_address, storage)| { + storage.into_iter().try_for_each(|(key, value)| -> Result<(), TransactionError> { + if hashed_storage + .seek_by_key_subkey(hashed_address, key)? + .filter(|entry| entry.key == key) + .is_some() + { + hashed_storage.delete_current()?; + } + + if value != U256::ZERO { + hashed_storage.upsert(hashed_address, StorageEntry { key, value })?; + } + Ok(()) + }) + })?; + Ok(()) + } + + /// Iterate over account changesets and return all account address that were changed. + pub fn get_addresses_of_changed_accounts( + &self, + from: TransitionId, + to: TransitionId, + ) -> Result, TransactionError> { + Ok(self + .cursor_read::()? + .walk_range(from..to)? + .collect::, _>>()? + .into_iter() + // fold all account to one set of changed accounts + .fold(BTreeSet::new(), |mut accounts: BTreeSet
, (_, account_before)| { + accounts.insert(account_before.address); + accounts + })) + } + + /// Get plainstate account from iterator + pub fn get_plainstate_accounts( + &self, + iter: impl IntoIterator, + ) -> Result)>, TransactionError> { + let mut plain_accounts = self.cursor_read::()?; + Ok(iter + .into_iter() + .map(|address| plain_accounts.seek_exact(address).map(|a| (address, a.map(|(_, v)| v)))) + .collect::, _>>()?) + } + + /// iterate over accounts and insert them to hashing table + pub fn insert_account_for_hashing( + &self, + accounts: impl IntoIterator)>, + ) -> Result<(), TransactionError> { + let mut hashed_accounts = self.cursor_write::()?; + + let hashes_accounts = accounts.into_iter().fold( + BTreeMap::new(), + |mut map: BTreeMap>, (address, account)| { + map.insert(keccak256(address), account); + map + }, + ); + + hashes_accounts.into_iter().try_for_each( + |(hashed_address, account)| -> Result<(), TransactionError> { + if let Some(account) = account { + hashed_accounts.upsert(hashed_address, account)? + } else if hashed_accounts.seek_exact(hashed_address)?.is_some() { + hashed_accounts.delete_current()?; + } + Ok(()) + }, + )?; + Ok(()) + } + + /// Get all transaction ids where account got changed. + pub fn get_storage_transition_ids_from_changeset( + &self, + from: TransitionId, + to: TransitionId, + ) -> Result>, TransactionError> { + let storage_changeset = self + .cursor_read::()? + .walk(Some((from, Address::zero()).into()))? + .take_while(|res| res.as_ref().map(|(k, _)| k.transition_id() < to).unwrap_or_default()) + .collect::, _>>()?; + + // fold all storages to one set of changes + let storage_changeset_lists = storage_changeset.into_iter().fold( + BTreeMap::new(), + |mut storages: BTreeMap<(Address, H256), Vec>, (index, storage)| { + storages + .entry((index.address(), storage.key)) + .or_default() + .push(index.transition_id()); + storages + }, + ); + + Ok(storage_changeset_lists) + } + + /// Get all transaction ids where account got changed. + pub fn get_account_transition_ids_from_changeset( + &self, + from: TransitionId, + to: TransitionId, + ) -> Result>, TransactionError> { + let account_changesets = self + .cursor_read::()? + .walk(Some(from))? + .take_while(|res| res.as_ref().map(|(k, _)| *k < to).unwrap_or_default()) + .collect::, _>>()?; + + let account_transtions = account_changesets + .into_iter() + // fold all account to one set of changed accounts + .fold( + BTreeMap::new(), + |mut accounts: BTreeMap>, (index, account)| { + accounts.entry(account.address).or_default().push(index); + accounts + }, + ); + + Ok(account_transtions) + } + + /// Insert storage change index to database. Used inside StorageHistoryIndex stage + pub fn insert_storage_history_index( + &self, + storage_transitions: BTreeMap<(Address, H256), Vec>, + ) -> Result<(), TransactionError> { + for ((address, storage_key), mut indices) in storage_transitions { + let mut last_shard = self.take_last_storage_shard(address, storage_key)?; + last_shard.append(&mut indices); + + // chunk indices and insert them in shards of N size. + let mut chunks = last_shard + .iter() + .chunks(storage_sharded_key::NUM_OF_INDICES_IN_SHARD) + .into_iter() + .map(|chunks| chunks.map(|i| *i as usize).collect::>()) + .collect::>(); + let last_chunk = chunks.pop(); + + // chunk indices and insert them in shards of N size. + chunks.into_iter().try_for_each(|list| { + self.put::( + StorageShardedKey::new( + address, + storage_key, + *list.last().expect("Chuck does not return empty list") as TransitionId, + ), + TransitionList::new(list).expect("Indices are presorted and not empty"), + ) + })?; + // Insert last list with u64::MAX + if let Some(last_list) = last_chunk { + self.put::( + StorageShardedKey::new(address, storage_key, u64::MAX), + TransitionList::new(last_list).expect("Indices are presorted and not empty"), + )?; + } + } + Ok(()) + } + + /// Insert account change index to database. Used inside AccountHistoryIndex stage + pub fn insert_account_history_index( + &self, + account_transitions: BTreeMap>, + ) -> Result<(), TransactionError> { + // insert indexes to AccountHistory. + for (address, mut indices) in account_transitions { + let mut last_shard = self.take_last_account_shard(address)?; + last_shard.append(&mut indices); + // chunk indices and insert them in shards of N size. + let mut chunks = last_shard + .iter() + .chunks(sharded_key::NUM_OF_INDICES_IN_SHARD) + .into_iter() + .map(|chunks| chunks.map(|i| *i as usize).collect::>()) + .collect::>(); + let last_chunk = chunks.pop(); + + chunks.into_iter().try_for_each(|list| { + self.put::( + ShardedKey::new( + address, + *list.last().expect("Chuck does not return empty list") as TransitionId, + ), + TransitionList::new(list).expect("Indices are presorted and not empty"), + ) + })?; + // Insert last list with u64::MAX + if let Some(last_list) = last_chunk { + self.put::( + ShardedKey::new(address, u64::MAX), + TransitionList::new(last_list).expect("Indices are presorted and not empty"), + )? + } + } + Ok(()) + } + + /// Used inside execution stage to commit created account storage changesets for transaction or + /// block state change. + pub fn insert_execution_result( + &self, + changesets: Vec, + chain_spec: &ChainSpec, + parent_block_number: u64, + ) -> Result<(), TransactionError> { + // Get last tx count so that we can know amount of transaction in the block. + let mut current_transition_id = self + .get::(parent_block_number)? + .ok_or(ProviderError::BlockTransition { block_number: parent_block_number })?; + + info!(target: "sync::stages::execution", current_transition_id, blocks = changesets.len(), "Inserting execution results"); + + // apply changes to plain database. + let mut block_number = parent_block_number; + for results in changesets.into_iter() { + block_number += 1; + let spurious_dragon_active = + chain_spec.fork(Hardfork::SpuriousDragon).active_at_block(block_number); + // insert state change set + for result in results.tx_changesets.into_iter() { + for (address, account_change_set) in result.changeset.into_iter() { + let AccountChangeSet { account, wipe_storage, storage } = account_change_set; + // apply account change to db. Updates AccountChangeSet and PlainAccountState + // tables. + trace!(target: "sync::stages::execution", ?address, current_transition_id, ?account, wipe_storage, "Applying account changeset"); + account.apply_to_db( + &**self, + address, + current_transition_id, + spurious_dragon_active, + )?; + + let storage_id = TransitionIdAddress((current_transition_id, address)); + + // cast key to H256 and trace the change + let storage = storage + .into_iter() + .map(|(key, (old_value,new_value))| { + let hkey = H256(key.to_be_bytes()); + trace!(target: "sync::stages::execution", ?address, current_transition_id, ?hkey, ?old_value, ?new_value, "Applying storage changeset"); + (hkey, old_value,new_value) + }) + .collect::>(); + + let mut cursor_storage_changeset = + self.cursor_write::()?; + cursor_storage_changeset.seek_exact(storage_id)?; + + if wipe_storage { + // iterate over storage and save them before entry is deleted. + self.cursor_read::()? + .walk(Some(address))? + .take_while(|res| { + res.as_ref().map(|(k, _)| *k == address).unwrap_or_default() + }) + .try_for_each(|entry| { + let (_, old_value) = entry?; + cursor_storage_changeset.append(storage_id, old_value) + })?; + + // delete all entries + self.delete::(address, None)?; + + // insert storage changeset + for (key, _, new_value) in storage { + // old values are already cleared. + if new_value != U256::ZERO { + self.put::( + address, + StorageEntry { key, value: new_value }, + )?; + } + } + } else { + // insert storage changeset + for (key, old_value, new_value) in storage { + let old_entry = StorageEntry { key, value: old_value }; + let new_entry = StorageEntry { key, value: new_value }; + // insert into StorageChangeSet + cursor_storage_changeset.append(storage_id, old_entry)?; + + // Always delete old value as duplicate table, put will not override it + self.delete::(address, Some(old_entry))?; + if new_value != U256::ZERO { + self.put::(address, new_entry)?; + } + } + } + } + // insert bytecode + for (hash, bytecode) in result.new_bytecodes.into_iter() { + // make different types of bytecode. Checked and maybe even analyzed (needs to + // be packed). Currently save only raw bytes. + let bytecode = bytecode.bytes(); + trace!(target: "sync::stages::execution", ?hash, ?bytecode, len = bytecode.len(), "Inserting bytecode"); + self.put::(hash, bytecode[..bytecode.len()].to_vec())?; + + // NOTE: bytecode bytes are not inserted in change set and can be found in + // separate table + } + current_transition_id += 1; + } + + // If there are any post block changes, we will add account changesets to db. + for (address, changeset) in results.block_changesets.into_iter() { + trace!(target: "sync::stages::execution", ?address, current_transition_id, "Applying block reward"); + changeset.apply_to_db( + &**self, + address, + current_transition_id, + spurious_dragon_active, + )?; + } + current_transition_id += 1; + } + Ok(()) + } } /// An error that can occur when using the transaction container @@ -224,4 +745,19 @@ pub enum TransactionError { /// The transaction encountered a database integrity error. #[error("A database integrity error occurred: {0}")] DatabaseIntegrity(#[from] ProviderError), + /// The transaction encountered merkle trie error. + #[error("Merkle trie calculation error: {0}")] + MerkleTrie(#[from] TrieError), + /// Root mismatch + #[error("Merkle trie root mismatch on block: #{block_number:?} {block_hash:?}. got: {got:?} expected:{got:?}")] + StateTrieRootMismatch { + /// Expected root + expected: H256, + /// Calculated root + got: H256, + /// Block number + block_number: BlockNumber, + /// Block hash + block_hash: BlockHash, + }, } diff --git a/crates/stages/src/trie/mod.rs b/crates/storage/provider/src/trie/mod.rs similarity index 99% rename from crates/stages/src/trie/mod.rs rename to crates/storage/provider/src/trie/mod.rs index 199df9f841..c99689b846 100644 --- a/crates/stages/src/trie/mod.rs +++ b/crates/storage/provider/src/trie/mod.rs @@ -1,3 +1,4 @@ +use crate::Transaction; use cita_trie::{PatriciaTrie, Trie}; use hasher::HasherKeccak; use reth_db::{ @@ -11,18 +12,19 @@ use reth_primitives::{ keccak256, proofs::EMPTY_ROOT, Account, Address, StorageEntry, StorageTrieEntry, TransitionId, H256, KECCAK_EMPTY, U256, }; -use reth_provider::Transaction; use reth_rlp::{ encode_fixed_size, Decodable, DecodeError, Encodable, RlpDecodable, RlpEncodable, EMPTY_STRING_CODE, }; +use reth_tracing::tracing::*; use std::{ collections::{BTreeMap, BTreeSet}, ops::Range, sync::Arc, }; -use tracing::*; +/// Merkle Trie error types +#[allow(missing_docs)] #[derive(Debug, thiserror::Error)] pub enum TrieError { #[error("Some error occurred: {0}")] @@ -410,9 +412,8 @@ mod tests { hex_literal::hex, keccak256, proofs::{genesis_state_root, KeccakHasher, EMPTY_ROOT}, - Address, ChainSpec, + Address, ChainSpec, MAINNET, }; - use reth_staged_sync::utils::chainspec::chain_spec_value_parser; use std::{collections::HashMap, str::FromStr}; use triehash::sec_trie_root; @@ -553,7 +554,7 @@ mod tests { let trie = DBTrieLoader::default(); let db = create_test_rw_db(); let mut tx = Transaction::new(db.as_ref()).unwrap(); - let ChainSpec { genesis, .. } = chain_spec_value_parser("mainnet").unwrap(); + let ChainSpec { genesis, .. } = MAINNET.clone(); // Insert account state for (address, account) in &genesis.alloc { diff --git a/crates/storage/provider/src/utils.rs b/crates/storage/provider/src/utils.rs index 086cc7fc8c..1388870620 100644 --- a/crates/storage/provider/src/utils.rs +++ b/crates/storage/provider/src/utils.rs @@ -4,34 +4,39 @@ use reth_db::{ transaction::{DbTx, DbTxMut}, }; use reth_interfaces::{provider::ProviderError, Result}; -use reth_primitives::{SealedBlock, U256}; +use reth_primitives::{SealedBlock, TransitionId, U256}; /// Insert block data into corresponding tables. Used mainly for testing & internal tooling. /// /// /// Check parent dependency in [tables::HeaderNumbers] and in [tables::BlockBodies] tables. -/// Inserts blocks data to [tables::CanonicalHeaders], [tables::Headers], [tables::HeaderNumbers], -/// and transactions data to [tables::TxSenders], [tables::Transactions], -/// [tables::BlockBodies] and [tables::BlockBodies] +/// Inserts header data to [tables::CanonicalHeaders], [tables::Headers], [tables::HeaderNumbers]. +/// and transactions data to [tables::TxSenders], [tables::Transactions], [tables::TxHashNumber]. +/// and transition indexes to [tables::BlockTransitionIndex] and [tables::TxTransitionIndex]. +/// And block data [tables::BlockBodies], [tables::BlockBodies] and [tables::BlockWithdrawals]. +/// +/// Return [TransitionId] `(from,to)` pub fn insert_block<'a, TX: DbTxMut<'a> + DbTx<'a>>( tx: &TX, block: &SealedBlock, has_block_reward: bool, parent_tx_num_transition_id: Option<(u64, u64)>, -) -> Result<()> { +) -> Result<(TransitionId, TransitionId)> { tx.put::(block.number, block.hash())?; // Put header with canonical hashes. tx.put::(block.number, block.header.as_ref().clone())?; tx.put::(block.hash(), block.number)?; - tx.put::( - block.number, - if has_block_reward { - U256::ZERO - } else { - U256::from(58_750_000_000_000_000_000_000_u128) + block.difficulty - } - .into(), - )?; + + // total difficulty + let ttd = if block.number == 0 { + U256::ZERO + } else { + let parent_block_number = block.number - 1; + let parent_ttd = tx.get::(parent_block_number)?.unwrap_or_default(); + parent_ttd.0 + block.difficulty + }; + + tx.put::(block.number, ttd.into())?; // insert body ommers data if !block.ommers.is_empty() { @@ -56,7 +61,7 @@ pub fn insert_block<'a, TX: DbTxMut<'a> + DbTx<'a>>( .ok_or(ProviderError::BlockTransition { block_number: prev_block_num })?; (prev_body.start_tx_id + prev_body.tx_count, last_transition_id) }; - + let from_transition = transition_id; // insert body data tx.put::( block.number, @@ -65,9 +70,11 @@ pub fn insert_block<'a, TX: DbTxMut<'a> + DbTx<'a>>( for transaction in block.body.iter() { let rec_tx = transaction.clone().into_ecrecovered().unwrap(); + let hash = rec_tx.hash(); tx.put::(current_tx_id, rec_tx.signer())?; tx.put::(current_tx_id, rec_tx.into())?; tx.put::(current_tx_id, transition_id)?; + tx.put::(hash, current_tx_id)?; transition_id += 1; current_tx_id += 1; } @@ -88,7 +95,8 @@ pub fn insert_block<'a, TX: DbTxMut<'a> + DbTx<'a>>( } tx.put::(block.number, transition_id)?; - Ok(()) + let to_transition = transition_id; + Ok((from_transition, to_transition)) } /// Inserts canonical block in blockchain. Parent tx num and transition id is taken from @@ -97,6 +105,6 @@ pub fn insert_canonical_block<'a, TX: DbTxMut<'a> + DbTx<'a>>( tx: &TX, block: &SealedBlock, has_block_reward: bool, -) -> Result<()> { +) -> Result<(TransitionId, TransitionId)> { insert_block(tx, block, has_block_reward, None) }