cut unnecessary methods

This commit is contained in:
Dan Cline
2024-02-14 16:16:05 -05:00
parent 9afad19fa0
commit 4fbee52d32
9 changed files with 81 additions and 242 deletions

View File

@@ -6,7 +6,6 @@ use crate::{
DatabaseArgs, StageEnum,
},
dirs::{DataDirPath, MaybePlatformPath},
utils::DbTool,
};
use clap::Parser;
use reth_db::{
@@ -14,6 +13,7 @@ use reth_db::{
};
use reth_node_core::init::{insert_genesis_header, insert_genesis_state};
use reth_primitives::{fs, stage::StageId, ChainSpec};
use reth_provider::DatabaseProviderRW;
use std::sync::Arc;
use tracing::info;
@@ -59,11 +59,13 @@ impl Command {
let db =
open_db(db_path.as_ref(), DatabaseArguments::default().log_level(self.db.log_level))?;
let tool = DbTool::new(&db, self.chain.clone())?;
let provider_rw =
DatabaseProviderRW::<DatabaseEnv>::with_tx(db.tx_mut()?, self.chain.clone(), &None)?;
tool.db.update(|tx| {
provider_rw.update(|provider| {
match &self.stage {
StageEnum::Bodies => {
let tx = provider.tx_mut();
tx.clear::<tables::BlockBodyIndices>()?;
tx.clear::<tables::Transactions>()?;
tx.clear::<tables::TransactionBlock>()?;
@@ -73,6 +75,7 @@ impl Command {
insert_genesis_header::<DatabaseEnv>(tx, self.chain)?;
}
StageEnum::Senders => {
let tx = provider.tx_mut();
tx.clear::<tables::TxSenders>()?;
tx.put::<tables::SyncStage>(
StageId::SenderRecovery.to_string(),
@@ -80,6 +83,7 @@ impl Command {
)?;
}
StageEnum::Execution => {
let tx = provider.tx_mut();
tx.clear::<tables::PlainAccountState>()?;
tx.clear::<tables::PlainStorageState>()?;
tx.clear::<tables::AccountChangeSet>()?;
@@ -90,9 +94,11 @@ impl Command {
StageId::Execution.to_string(),
Default::default(),
)?;
insert_genesis_state::<DatabaseEnv>(tx, self.chain.genesis())?;
insert_genesis_state(provider, self.chain.genesis())?;
}
StageEnum::AccountHashing => {
let tx = provider.tx_mut();
tx.clear::<tables::HashedAccount>()?;
tx.put::<tables::SyncStage>(
StageId::AccountHashing.to_string(),
@@ -100,6 +106,7 @@ impl Command {
)?;
}
StageEnum::StorageHashing => {
let tx = provider.tx_mut();
tx.clear::<tables::HashedStorage>()?;
tx.put::<tables::SyncStage>(
StageId::StorageHashing.to_string(),
@@ -107,6 +114,8 @@ impl Command {
)?;
}
StageEnum::Hashing => {
let tx = provider.tx_mut();
// Clear hashed accounts
tx.clear::<tables::HashedAccount>()?;
tx.put::<tables::SyncStage>(
@@ -122,6 +131,7 @@ impl Command {
)?;
}
StageEnum::Merkle => {
let tx = provider.tx_mut();
tx.clear::<tables::AccountsTrie>()?;
tx.clear::<tables::StoragesTrie>()?;
tx.put::<tables::SyncStage>(
@@ -138,6 +148,7 @@ impl Command {
)?;
}
StageEnum::AccountHistory | StageEnum::StorageHistory => {
let tx = provider.tx_mut();
tx.clear::<tables::AccountHistory>()?;
tx.clear::<tables::StorageHistory>()?;
tx.put::<tables::SyncStage>(
@@ -150,6 +161,8 @@ impl Command {
)?;
}
StageEnum::TotalDifficulty => {
let tx = provider.tx_mut();
tx.clear::<tables::HeaderTD>()?;
tx.put::<tables::SyncStage>(
StageId::TotalDifficulty.to_string(),
@@ -158,6 +171,7 @@ impl Command {
insert_genesis_header::<DatabaseEnv>(tx, self.chain)?;
}
StageEnum::TxLookup => {
let tx = provider.tx_mut();
tx.clear::<tables::TxHashNumber>()?;
tx.put::<tables::SyncStage>(
StageId::TransactionLookup.to_string(),
@@ -171,6 +185,7 @@ impl Command {
}
}
let tx = provider.tx_mut();
tx.put::<tables::SyncStage>(StageId::Finish.to_string(), Default::default())?;
Ok::<_, eyre::Error>(())

View File

@@ -13,7 +13,7 @@ use reth_primitives::{
use reth_provider::{
bundle_state::{BundleStateInit, RevertsInit},
BundleStateWithReceipts, DatabaseProviderRW, HashingWriter, HistoryWriter, OriginalValuesKnown,
ProviderError, ProviderFactory,
ProviderError, ProviderFactory, StateWriter,
};
use std::{
collections::{BTreeMap, HashMap},
@@ -75,14 +75,13 @@ pub fn init_genesis<DB: Database>(
let provider_rw = factory.provider_rw()?;
insert_genesis_hashes(&provider_rw, genesis)?;
insert_genesis_history(&provider_rw, genesis)?;
insert_genesis_state(&provider_rw, genesis)?;
provider_rw.commit()?;
// Insert header
let tx = db.tx_mut()?;
insert_genesis_header::<DB>(&tx, chain.clone())?;
insert_genesis_state::<DB>(&tx, genesis)?;
// insert sync stage
for stage in StageId::ALL.iter() {
tx.put::<tables::SyncStage>(stage.to_string(), Default::default())?;
@@ -94,7 +93,7 @@ pub fn init_genesis<DB: Database>(
/// Inserts the genesis state into the database.
pub fn insert_genesis_state<DB: Database>(
tx: &<DB as Database>::TXMut,
provider: &DatabaseProviderRW<DB>,
genesis: &reth_primitives::Genesis,
) -> ProviderResult<()> {
let mut state_init: BundleStateInit = HashMap::new();
@@ -153,7 +152,7 @@ pub fn insert_genesis_state<DB: Database>(
0,
);
bundle.write_to_db(tx, OriginalValuesKnown::Yes)?;
provider.write_state(bundle, OriginalValuesKnown::Yes)?;
Ok(())
}

View File

@@ -1,10 +1,3 @@
use crate::{StateChanges, StateReverts};
use reth_db::{
cursor::{DbCursorRO, DbCursorRW},
tables,
transaction::{DbTx, DbTxMut},
};
use reth_interfaces::db::DatabaseError;
use reth_primitives::{
logs_bloom,
revm::compat::{into_reth_acc, into_revm_acc},
@@ -289,46 +282,6 @@ impl BundleStateWithReceipts {
// swap bundles
std::mem::swap(&mut self.bundle, &mut other)
}
/// Write the [BundleStateWithReceipts] to the database.
///
/// `is_value_known` should be set to `Not` if the [BundleStateWithReceipts] has some of its
/// state detached, This would make some original values not known.
pub fn write_to_db<TX: DbTxMut + DbTx>(
self,
tx: &TX,
is_value_known: OriginalValuesKnown,
) -> Result<(), DatabaseError> {
let (plain_state, reverts) = self.bundle.into_plain_state_and_reverts(is_value_known);
StateReverts(reverts).write_to_db(tx, self.first_block)?;
// write receipts
let mut bodies_cursor = tx.cursor_read::<tables::BlockBodyIndices>()?;
let mut receipts_cursor = tx.cursor_write::<tables::Receipts>()?;
for (idx, receipts) in self.receipts.into_iter().enumerate() {
if !receipts.is_empty() {
let block_number = self.first_block + idx as u64;
let (_, body_indices) =
bodies_cursor.seek_exact(block_number)?.unwrap_or_else(|| {
let last_available = bodies_cursor.last().ok().flatten().map(|(number, _)| number);
panic!("body indices for block {block_number} must exist. last available block number: {last_available:?}");
});
let first_tx_index = body_indices.first_tx_num();
for (tx_idx, receipt) in receipts.into_iter().enumerate() {
if let Some(receipt) = receipt {
receipts_cursor.append(first_tx_index + tx_idx as u64, receipt)?;
}
}
}
}
StateChanges(plain_state).write_to_db(tx)?;
Ok(())
}
}
#[cfg(test)]
@@ -344,7 +297,7 @@ mod tests {
models::{AccountBeforeTx, BlockNumberAddress},
tables,
test_utils::create_test_rw_db,
transaction::DbTx,
transaction::{DbTx, DbTxMut},
};
use reth_primitives::{
keccak256, revm::compat::into_reth_acc, Address, Receipt, Receipts, StorageEntry, B256,

View File

@@ -2,12 +2,10 @@
//! This module contains all the logic related to bundle state.
mod bundle_state_with_receipts;
mod hashed_state_changes;
mod state_changes;
mod state_reverts;
pub use bundle_state_with_receipts::{
AccountRevertInit, BundleStateInit, BundleStateWithReceipts, OriginalValuesKnown, RevertsInit,
};
pub use hashed_state_changes::HashedStateChanges;
pub use state_changes::StateChanges;
pub use state_reverts::{StateReverts, StorageRevertsIter};
pub use state_reverts::StorageRevertsIter;

View File

@@ -1,82 +0,0 @@
use rayon::slice::ParallelSliceMut;
use reth_db::{
cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
tables,
transaction::{DbTx, DbTxMut},
};
use reth_interfaces::db::DatabaseError;
use reth_primitives::{revm::compat::into_reth_acc, Bytecode, StorageEntry, U256};
use revm::db::states::{PlainStorageChangeset, StateChangeset};
/// A change to the state of the world.
#[derive(Debug, Default)]
pub struct StateChanges(pub StateChangeset);
impl From<StateChangeset> for StateChanges {
fn from(revm: StateChangeset) -> Self {
Self(revm)
}
}
impl StateChanges {
/// Write the bundle state to the database.
pub fn write_to_db<TX: DbTxMut + DbTx>(mut self, tx: &TX) -> Result<(), DatabaseError> {
// sort all entries so they can be written to database in more performant way.
// and take smaller memory footprint.
self.0.accounts.par_sort_by_key(|a| a.0);
self.0.storage.par_sort_by_key(|a| a.address);
self.0.contracts.par_sort_by_key(|a| a.0);
// Write new account state
tracing::trace!(target: "provider::bundle_state", len = self.0.accounts.len(), "Writing new account state");
let mut accounts_cursor = tx.cursor_write::<tables::PlainAccountState>()?;
// write account to database.
for (address, account) in self.0.accounts.into_iter() {
if let Some(account) = account {
tracing::trace!(target: "provider::bundle_state", ?address, "Updating plain state account");
accounts_cursor.upsert(address, into_reth_acc(account))?;
} else if accounts_cursor.seek_exact(address)?.is_some() {
tracing::trace!(target: "provider::bundle_state", ?address, "Deleting plain state account");
accounts_cursor.delete_current()?;
}
}
// Write bytecode
tracing::trace!(target: "provider::bundle_state", len = self.0.contracts.len(), "Writing bytecodes");
let mut bytecodes_cursor = tx.cursor_write::<tables::Bytecodes>()?;
for (hash, bytecode) in self.0.contracts.into_iter() {
bytecodes_cursor.upsert(hash, Bytecode(bytecode))?;
}
// Write new storage state and wipe storage if needed.
tracing::trace!(target: "provider::bundle_state", len = self.0.storage.len(), "Writing new storage state");
let mut storages_cursor = tx.cursor_dup_write::<tables::PlainStorageState>()?;
for PlainStorageChangeset { address, wipe_storage, storage } in self.0.storage.into_iter() {
// Wiping of storage.
if wipe_storage && storages_cursor.seek_exact(address)?.is_some() {
storages_cursor.delete_current_duplicates()?;
}
// cast storages to B256.
let mut storage = storage
.into_iter()
.map(|(k, value)| StorageEntry { key: k.into(), value })
.collect::<Vec<_>>();
// sort storage slots by key.
storage.par_sort_unstable_by_key(|a| a.key);
for entry in storage.into_iter() {
tracing::trace!(target: "provider::bundle_state", ?address, ?entry.key, "Updating plain state storage");
if let Some(db_entry) = storages_cursor.seek_by_key_subkey(address, entry.key)? {
if db_entry.key == entry.key {
storages_cursor.delete_current()?;
}
}
if entry.value != U256::ZERO {
storages_cursor.upsert(address, entry)?;
}
}
}
Ok(())
}
}

View File

@@ -1,95 +1,7 @@
use rayon::slice::ParallelSliceMut;
use reth_db::{
cursor::{DbCursorRO, DbDupCursorRO, DbDupCursorRW},
models::{AccountBeforeTx, BlockNumberAddress},
tables,
transaction::{DbTx, DbTxMut},
};
use reth_interfaces::db::DatabaseError;
use reth_primitives::{revm::compat::into_reth_acc, BlockNumber, StorageEntry, B256, U256};
use revm::db::states::{PlainStateReverts, PlainStorageRevert, RevertToSlot};
use reth_primitives::{B256, U256};
use revm::db::states::RevertToSlot;
use std::iter::Peekable;
/// Revert of the state.
#[derive(Debug, Default)]
pub struct StateReverts(pub PlainStateReverts);
impl From<PlainStateReverts> for StateReverts {
fn from(revm: PlainStateReverts) -> Self {
Self(revm)
}
}
impl StateReverts {
/// Write reverts to database.
///
/// Note:: Reverts will delete all wiped storage from plain state.
pub fn write_to_db<TX: DbTxMut + DbTx>(
self,
tx: &TX,
first_block: BlockNumber,
) -> Result<(), DatabaseError> {
// Write storage changes
tracing::trace!(target: "provider::reverts", "Writing storage changes");
let mut storages_cursor = tx.cursor_dup_write::<tables::PlainStorageState>()?;
let mut storage_changeset_cursor = tx.cursor_dup_write::<tables::StorageChangeSet>()?;
for (block_index, mut storage_changes) in self.0.storage.into_iter().enumerate() {
let block_number = first_block + block_index as BlockNumber;
tracing::trace!(target: "provider::reverts", block_number, "Writing block change");
// sort changes by address.
storage_changes.par_sort_unstable_by_key(|a| a.address);
for PlainStorageRevert { address, wiped, storage_revert } in storage_changes.into_iter()
{
let storage_id = BlockNumberAddress((block_number, address));
let mut storage = storage_revert
.into_iter()
.map(|(k, v)| (B256::new(k.to_be_bytes()), v))
.collect::<Vec<_>>();
// sort storage slots by key.
storage.par_sort_unstable_by_key(|a| a.0);
// If we are writing the primary storage wipe transition, the pre-existing plain
// storage state has to be taken from the database and written to storage history.
// See [StorageWipe::Primary] for more details.
let mut wiped_storage = Vec::new();
if wiped {
tracing::trace!(target: "provider::reverts", ?address, "Wiping storage");
if let Some((_, entry)) = storages_cursor.seek_exact(address)? {
wiped_storage.push((entry.key, entry.value));
while let Some(entry) = storages_cursor.next_dup_val()? {
wiped_storage.push((entry.key, entry.value))
}
}
}
tracing::trace!(target: "provider::reverts", ?address, ?storage, "Writing storage reverts");
for (key, value) in StorageRevertsIter::new(storage, wiped_storage) {
storage_changeset_cursor.append_dup(storage_id, StorageEntry { key, value })?;
}
}
}
// Write account changes
tracing::trace!(target: "provider::reverts", "Writing account changes");
let mut account_changeset_cursor = tx.cursor_dup_write::<tables::AccountChangeSet>()?;
for (block_index, mut account_block_reverts) in self.0.accounts.into_iter().enumerate() {
let block_number = first_block + block_index as BlockNumber;
// Sort accounts by address.
account_block_reverts.par_sort_by_key(|a| a.0);
for (address, info) in account_block_reverts {
account_changeset_cursor.append_dup(
block_number,
AccountBeforeTx { address, info: info.map(into_reth_acc) },
)?;
}
}
Ok(())
}
}
/// Iterator over storage reverts.
/// See [StorageRevertsIter::next] for more details.
pub struct StorageRevertsIter<R: Iterator, W: Iterator> {
@@ -97,6 +9,18 @@ pub struct StorageRevertsIter<R: Iterator, W: Iterator> {
wiped: Peekable<W>,
}
impl<R, W, A, B> std::fmt::Debug for StorageRevertsIter<R, W>
where
R: Iterator<Item = A>,
W: Iterator<Item = B>,
A: std::fmt::Debug,
B: std::fmt::Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StorageRevertsIter").finish()
}
}
impl<R: Iterator, W: Iterator> StorageRevertsIter<R, W>
where
R: Iterator<Item = (B256, RevertToSlot)>,

View File

@@ -33,7 +33,7 @@ pub mod chain;
pub use chain::{Chain, DisplayBlocksChain};
pub mod bundle_state;
pub use bundle_state::{BundleStateWithReceipts, OriginalValuesKnown, StateChanges, StateReverts};
pub use bundle_state::{BundleStateWithReceipts, OriginalValuesKnown};
pub(crate) fn to_range<R: std::ops::RangeBounds<u64>>(bounds: R) -> std::ops::Range<u64> {
let start = match bounds.start_bound() {

View File

@@ -118,13 +118,11 @@ impl<DB: Database> ProviderFactory<DB> {
/// open.
#[track_caller]
pub fn provider_rw(&self) -> ProviderResult<DatabaseProviderRW<DB>> {
let mut provider = DatabaseProvider::new_rw(self.db.tx_mut()?, self.chain_spec.clone());
if let Some(snapshot_provider) = &self.snapshot_provider {
provider = provider.with_snapshot_provider(snapshot_provider.clone());
}
Ok(DatabaseProviderRW(provider))
DatabaseProviderRW::with_tx(
self.db.tx_mut()?,
self.chain_spec.clone(),
&self.snapshot_provider,
)
}
/// Storage provider for latest block

View File

@@ -74,6 +74,40 @@ pub type DatabaseProviderRO<DB> = DatabaseProvider<<DB as Database>::TX>;
#[derive(Debug)]
pub struct DatabaseProviderRW<DB: Database>(pub DatabaseProvider<<DB as Database>::TXMut>);
impl<DB> DatabaseProviderRW<DB>
where
DB: Database,
{
/// Returns a provider with the given `DbTxMut` inside, which allows fetching and updating
/// data from the database using different types of providers.
#[track_caller]
pub fn with_tx(
tx: <DB as Database>::TXMut,
chain_spec: Arc<ChainSpec>,
snapshot_provider: &Option<Arc<SnapshotProvider>>,
) -> ProviderResult<DatabaseProviderRW<DB>> {
let mut provider = DatabaseProvider::new_rw(tx, chain_spec.clone());
if let Some(snapshot_provider) = snapshot_provider {
provider = provider.with_snapshot_provider(snapshot_provider.clone());
}
Ok(DatabaseProviderRW(provider))
}
/// Takes a function and passes a write-read transaction into it, making sure it's committed at
/// the end of the execution.
pub fn update<T, F>(mut self, f: F) -> Result<T, DatabaseError>
where
F: FnOnce(&mut Self) -> T,
{
let res = f(&mut self);
self.commit()?;
Ok(res)
}
}
impl<DB: Database> Deref for DatabaseProviderRW<DB> {
type Target = DatabaseProvider<<DB as Database>::TXMut>;
@@ -90,7 +124,7 @@ impl<DB: Database> DerefMut for DatabaseProviderRW<DB> {
impl<DB: Database> DatabaseProviderRW<DB> {
/// Commit database transaction
pub fn commit(self) -> ProviderResult<bool> {
pub fn commit(self) -> Result<bool, DatabaseError> {
self.0.commit()
}
@@ -375,7 +409,7 @@ impl<TX: DbTx> DatabaseProvider<TX> {
impl<TX: DbTxMut + DbTx> DatabaseProvider<TX> {
/// Commit database transaction.
pub fn commit(self) -> ProviderResult<bool> {
pub fn commit(self) -> Result<bool, DatabaseError> {
Ok(self.tx.commit()?)
}