From 49292091dd97c9a878ed47cab9070a1eff24099e Mon Sep 17 00:00:00 2001 From: Georgios Konstantopoulos Date: Mon, 13 Feb 2023 19:10:58 -0800 Subject: [PATCH] Revert "feat: add `reth dump-stage` command" (#1327) --- bin/reth/src/cli.rs | 6 +- bin/reth/src/db/mod.rs | 4 +- bin/reth/src/dump_stage/mod.rs | 203 ------------------ bin/reth/src/lib.rs | 1 - crates/stages/src/stages/execution.rs | 39 ++-- crates/storage/db/src/abstraction/database.rs | 3 +- crates/storage/db/src/abstraction/mock.rs | 8 +- crates/storage/db/src/abstraction/table.rs | 60 +----- .../storage/db/src/abstraction/transaction.rs | 2 - .../storage/db/src/implementation/mdbx/tx.rs | 8 +- crates/storage/db/src/tables/models/blocks.rs | 6 - crates/storage/provider/src/transaction.rs | 11 - 12 files changed, 24 insertions(+), 327 deletions(-) delete mode 100644 bin/reth/src/dump_stage/mod.rs diff --git a/bin/reth/src/cli.rs b/bin/reth/src/cli.rs index e984a0b937..ed9d0f9d5b 100644 --- a/bin/reth/src/cli.rs +++ b/bin/reth/src/cli.rs @@ -4,7 +4,7 @@ use std::str::FromStr; use crate::{ chain, db, dirs::{LogsDir, PlatformPath}, - dump_stage, node, p2p, + node, p2p, runner::CliRunner, stage, test_eth_chain, test_vectors, }; @@ -30,7 +30,6 @@ pub fn run() -> eyre::Result<()> { Commands::Import(command) => runner.run_until_ctrl_c(command.execute()), Commands::Db(command) => runner.run_until_ctrl_c(command.execute()), Commands::Stage(command) => runner.run_until_ctrl_c(command.execute()), - Commands::DumpStage(command) => runner.run_until_ctrl_c(command.execute()), Commands::P2P(command) => runner.run_until_ctrl_c(command.execute()), Commands::TestVectors(command) => runner.run_until_ctrl_c(command.execute()), Commands::TestEthChain(command) => runner.run_until_ctrl_c(command.execute()), @@ -60,9 +59,6 @@ pub enum Commands { /// a lot of memory to store all the data. #[command(name = "stage")] Stage(stage::Command), - /// Dumps a stage from a range into a new database. - #[command(name = "dump-stage")] - DumpStage(dump_stage::Command), /// P2P Debugging utilities #[command(name = "p2p")] P2P(p2p::Command), diff --git a/bin/reth/src/db/mod.rs b/bin/reth/src/db/mod.rs index 4e215ef22e..506362d75b 100644 --- a/bin/reth/src/db/mod.rs +++ b/bin/reth/src/db/mod.rs @@ -189,13 +189,13 @@ impl Command { } /// Wrapper over DB that implements many useful DB queries. -pub(crate) struct DbTool<'a, DB: Database> { +struct DbTool<'a, DB: Database> { pub(crate) db: &'a DB, } impl<'a, DB: Database> DbTool<'a, DB> { /// Takes a DB where the tables have already been created. - pub(crate) fn new(db: &'a DB) -> eyre::Result { + fn new(db: &'a DB) -> eyre::Result { Ok(Self { db }) } diff --git a/bin/reth/src/dump_stage/mod.rs b/bin/reth/src/dump_stage/mod.rs deleted file mode 100644 index 2617e8b21d..0000000000 --- a/bin/reth/src/dump_stage/mod.rs +++ /dev/null @@ -1,203 +0,0 @@ -//! Database debugging tool -use crate::dirs::{DbPath, PlatformPath}; -use clap::Parser; -use eyre::Result; -use reth_db::{ - cursor::DbCursorRO, database::Database, table::TableImporter, tables, transaction::DbTx, -}; -use reth_provider::Transaction; -use reth_staged_sync::utils::init::init_db; -use reth_stages::{stages::ExecutionStage, Stage, StageId, UnwindInput}; -use std::ops::DerefMut; -use tracing::info; - -use crate::db::DbTool; - -/// `reth dump-stage` command -#[derive(Debug, Parser)] -pub struct Command { - /// The path to the database folder. - /// - /// Defaults to the OS-specific data directory: - /// - /// - Linux: `$XDG_DATA_HOME/reth/db` or `$HOME/.local/share/reth/db` - /// - Windows: `{FOLDERID_RoamingAppData}/reth/db` - /// - macOS: `$HOME/Library/Application Support/reth/db` - #[arg(long, value_name = "PATH", verbatim_doc_comment, default_value_t)] - db: PlatformPath, - - #[clap(subcommand)] - command: Stages, -} - -/// Supported stages to be dumped -#[derive(Debug, Clone, Parser)] -pub enum Stages { - /// Execution stage. - Execution(StageCommand), -} - -/// Stage command that takes a range -#[derive(Debug, Clone, Parser)] -pub struct StageCommand { - /// The path to the new database folder. - /// - /// Defaults to the OS-specific data directory: - /// - /// - Linux: `$XDG_DATA_HOME/reth/db` or `$HOME/.local/share/reth/db` - /// - Windows: `{FOLDERID_RoamingAppData}/reth/db` - /// - macOS: `$HOME/Library/Application Support/reth/db` - #[arg(long, value_name = "OUTPUT_PATH", verbatim_doc_comment, default_value_t)] - output_db: PlatformPath, - /// From which block. - #[arg(long, short)] - from: u64, - /// To which block. - #[arg(long, short)] - to: u64, - /// If passed, it will dry-run a stage execution from the newly created database right after - /// dumping. - #[arg(long, short, default_value = "false")] - dry_run: bool, -} - -impl Command { - /// Execute `dump-stage` command - pub async fn execute(&self) -> eyre::Result<()> { - std::fs::create_dir_all(&self.db)?; - - // TODO: Auto-impl for Database trait - let db = reth_db::mdbx::Env::::open( - self.db.as_ref(), - reth_db::mdbx::EnvKind::RW, - )?; - - let mut tool = DbTool::new(&db)?; - - match &self.command { - Stages::Execution(StageCommand { output_db, from, to, dry_run, .. }) => { - dump_execution_stage(&mut tool, *from, *to, output_db, *dry_run).await? - } - } - - Ok(()) - } -} - -async fn dump_execution_stage( - db_tool: &mut DbTool<'_, DB>, - from: u64, - to: u64, - output_db: &PlatformPath, - dry_run: bool, -) -> Result<()> { - assert!(from < to, "FROM block should be bigger than TO block."); - - info!(target: "reth::cli", "Creating separate db at {}", output_db); - - let output_db = init_db(output_db)?; - - // Copy input tables. We're not sharing the transaction in case the memory grows too much. - output_db.update(|tx| { - tx.import_table_with_range::(&db_tool.db.tx()?, Some(from), to) - })??; - output_db.update(|tx| { - tx.import_table_with_range::(&db_tool.db.tx()?, Some(from), to) - })??; - output_db.update(|tx| { - tx.import_table_with_range::(&db_tool.db.tx()?, Some(from), to) - })??; - output_db.update(|tx| { - tx.import_table_with_range::(&db_tool.db.tx()?, Some(from), to) - })??; - output_db.update(|tx| { - tx.import_table_with_range::(&db_tool.db.tx()?, Some(from), to) - })??; - output_db.update(|tx| { - tx.import_table_with_range::( - &db_tool.db.tx()?, - Some(from - 1), - to + 1, - ) - })??; - - // Find range of transactions that need to be copied over - let (from_tx, to_tx) = db_tool.db.view(|read_tx| { - let mut read_cursor = read_tx.cursor_read::()?; - let (_, from_block) = - read_cursor.seek(from)?.ok_or(eyre::eyre!("BlockBody {from} does not exist."))?; - let (_, to_block) = - read_cursor.seek(to)?.ok_or(eyre::eyre!("BlockBody {to} does not exist."))?; - - Ok::<(u64, u64), eyre::ErrReport>(( - from_block.start_tx_id, - to_block.start_tx_id + to_block.tx_count, - )) - })??; - - output_db.update(|tx| { - tx.import_table_with_range::( - &db_tool.db.tx()?, - Some(from_tx), - to_tx, - ) - })??; - output_db.update(|tx| { - tx.import_table_with_range::(&db_tool.db.tx()?, Some(from_tx), to_tx) - })??; - - // Find the latest block to unwind from - let (tip_block_number, _) = db_tool - .db - .view(|tx| tx.cursor_read::()?.last())?? - .expect("some"); - - // Dry-run an unwind to FROM block, so we can get the PlainStorageState and - // PlainAccountState safely. There might be some state dependency from an address - // which hasn't been changed in the given range. - { - let mut unwind_tx = Transaction::new(db_tool.db)?; - let mut exec_stage = ExecutionStage::default(); - - exec_stage - .unwind( - &mut unwind_tx, - UnwindInput { unwind_to: from, stage_progress: tip_block_number, bad_block: None }, - ) - .await?; - - let unwind_inner_tx = unwind_tx.deref_mut(); - - output_db - .update(|tx| tx.import_dupsort::(unwind_inner_tx))??; - output_db - .update(|tx| tx.import_table::(unwind_inner_tx))??; - output_db.update(|tx| tx.import_table::(unwind_inner_tx))??; - - // We don't want to actually commit these changes to our original database. - unwind_tx.drop()?; - } - - // Try to re-execute the stage without committing - if dry_run { - info!(target: "reth::cli", "Executing stage. [dry-run]"); - - let mut tx = Transaction::new(&output_db)?; - - let mut exec_stage = ExecutionStage::default(); - exec_stage - .execute( - &mut tx, - reth_stages::ExecInput { - previous_stage: Some((StageId("Another"), to)), - stage_progress: Some(from), - }, - ) - .await?; - - tx.drop()?; - info!(target: "reth::cli", "Success."); - } - - Ok(()) -} diff --git a/bin/reth/src/lib.rs b/bin/reth/src/lib.rs index 79faa46225..244d5ef62f 100644 --- a/bin/reth/src/lib.rs +++ b/bin/reth/src/lib.rs @@ -12,7 +12,6 @@ pub mod chain; pub mod cli; pub mod db; pub mod dirs; -pub mod dump_stage; pub mod node; pub mod p2p; pub mod prometheus_exporter; diff --git a/crates/stages/src/stages/execution.rs b/crates/stages/src/stages/execution.rs index 549ecd797e..56a8637733 100644 --- a/crates/stages/src/stages/execution.rs +++ b/crates/stages/src/stages/execution.rs @@ -332,6 +332,21 @@ impl Stage for ExecutionStage { return Ok(UnwindOutput { stage_progress: input.unwind_to }) } + // get all batches for account change + // Check if walk and walk_dup would do the same thing + let account_changeset_batch = account_changeset + .walk_range(from_transition_rev..to_transition_rev)? + .collect::, _>>()?; + + // revert all changes to PlainState + for (_, changeset) in account_changeset_batch.into_iter().rev() { + if let Some(account_info) = changeset.info { + tx.put::(changeset.address, account_info)?; + } else { + tx.delete::(changeset.address, None)?; + } + } + // get all batches for storage change let storage_changeset_batch = storage_changeset .walk_range( @@ -345,34 +360,14 @@ impl Stage for ExecutionStage { for (key, storage) in storage_changeset_batch.into_iter().rev() { let address = key.address(); - if let Some(v) = plain_storage_cursor.seek_by_key_subkey(address, storage.key)? { - if v.key == storage.key { - plain_storage_cursor.delete_current()?; - } + if plain_storage_cursor.seek_by_key_subkey(address, storage.key)?.is_some() { + plain_storage_cursor.delete_current()?; } if storage.value != U256::ZERO { plain_storage_cursor.upsert(address, storage)?; } } - // Get all batches for account change - // Check if walk and walk_dup would do the same thing - let account_changeset_batch = account_changeset - .walk_range(from_transition_rev..to_transition_rev)? - .collect::, _>>()?; - - // revert all changes to PlainState - // Needs to happen after the storage unwind, so we don't end up inserting a storage value - // into a deleted key (eg. contract creation) - for (_, changeset) in account_changeset_batch.into_iter().rev() { - if let Some(account_info) = changeset.info { - tx.put::(changeset.address, account_info)?; - } else { - tx.delete::(changeset.address, None)?; - tx.delete::(changeset.address, None)?; - } - } - // Discard unwinded changesets let mut rev_acc_changeset_walker = account_changeset.walk_back(None)?; while let Some((transition_id, _)) = rev_acc_changeset_walker.next().transpose()? { diff --git a/crates/storage/db/src/abstraction/database.rs b/crates/storage/db/src/abstraction/database.rs index 2ad02e9c31..4de1f4c540 100644 --- a/crates/storage/db/src/abstraction/database.rs +++ b/crates/storage/db/src/abstraction/database.rs @@ -1,6 +1,5 @@ use crate::{ common::{Bounds, Sealed}, - table::TableImporter, transaction::{DbTx, DbTxMut}, Error, }; @@ -14,7 +13,7 @@ pub trait DatabaseGAT<'a, __ImplicitBounds: Sealed = Bounds<&'a Self>>: Send + S /// RO database transaction type TX: DbTx<'a> + Send + Sync; /// RW database transaction - type TXMut: DbTxMut<'a> + DbTx<'a> + TableImporter<'a> + Send + Sync; + type TXMut: DbTxMut<'a> + DbTx<'a> + Send + Sync; } /// Main Database trait that spawns transactions to be executed. diff --git a/crates/storage/db/src/abstraction/mock.rs b/crates/storage/db/src/abstraction/mock.rs index 3160a5c54b..8974a22790 100644 --- a/crates/storage/db/src/abstraction/mock.rs +++ b/crates/storage/db/src/abstraction/mock.rs @@ -8,7 +8,7 @@ use crate::{ ReverseWalker, Walker, }, database::{Database, DatabaseGAT}, - table::{DupSort, Table, TableImporter}, + table::{DupSort, Table}, transaction::{DbTx, DbTxGAT, DbTxMut, DbTxMutGAT}, Error, }; @@ -63,10 +63,6 @@ impl<'a> DbTx<'a> for TxMock { todo!() } - fn drop(self) { - todo!() - } - fn cursor_read(&self) -> Result<>::Cursor, Error> { todo!() } @@ -100,8 +96,6 @@ impl<'a> DbTxMut<'a> for TxMock { } } -impl<'a> TableImporter<'a> for TxMock {} - /// CUrsor that iterates over table pub struct CursorMock { _cursor: u32, diff --git a/crates/storage/db/src/abstraction/table.rs b/crates/storage/db/src/abstraction/table.rs index 691d12db27..fb1583a47f 100644 --- a/crates/storage/db/src/abstraction/table.rs +++ b/crates/storage/db/src/abstraction/table.rs @@ -1,8 +1,4 @@ -use crate::{ - abstraction::cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW}, - transaction::{DbTx, DbTxMut}, - Error, -}; +use crate::Error; use bytes::Bytes; use serde::Serialize; use std::{ @@ -80,57 +76,3 @@ pub trait DupSort: Table { /// Upstream docs: type SubKey: Key; } - -/// Allows duplicating tables across databases -pub trait TableImporter<'tx>: for<'a> DbTxMut<'a> { - /// Imports all table data from another transaction. - fn import_table>(&self, source_tx: &R) -> Result<(), Error> { - let mut destination_cursor = self.cursor_write::()?; - - for kv in source_tx.cursor_read::()?.walk(None)? { - let (k, v) = kv?; - destination_cursor.append(k, v)?; - } - - Ok(()) - } - - /// Imports table data from another transaction within a range. - fn import_table_with_range>( - &self, - source_tx: &R, - from: Option<::Key>, - to: ::Key, - ) -> Result<(), Error> { - let mut destination_cursor = self.cursor_write::()?; - let mut source_cursor = source_tx.cursor_read::()?; - - for row in source_cursor.walk(from)? { - let (key, value) = row?; - let finished = key == to; - - destination_cursor.append(key, value)?; - - if finished { - break - } - } - - Ok(()) - } - - /// Imports all dupsort data from another transaction. - fn import_dupsort>(&self, source_tx: &R) -> Result<(), Error> { - let mut destination_cursor = self.cursor_dup_write::()?; - let mut cursor = source_tx.cursor_dup_read::()?; - - while let Some((k, _)) = cursor.next_no_dup()? { - for kv in cursor.walk_dup(Some(k), None)? { - let (k, v) = kv?; - destination_cursor.append_dup(k, v)?; - } - } - - Ok(()) - } -} diff --git a/crates/storage/db/src/abstraction/transaction.rs b/crates/storage/db/src/abstraction/transaction.rs index 832a5e926e..81798cb08a 100644 --- a/crates/storage/db/src/abstraction/transaction.rs +++ b/crates/storage/db/src/abstraction/transaction.rs @@ -39,8 +39,6 @@ pub trait DbTx<'tx>: for<'a> DbTxGAT<'a> { /// Commit for read only transaction will consume and free transaction and allows /// freeing of memory pages fn commit(self) -> Result; - /// Drops transaction - fn drop(self); /// Iterate over read only values in table. fn cursor_read(&self) -> Result<>::Cursor, Error>; /// Iterate over read only values in dup sorted table. diff --git a/crates/storage/db/src/implementation/mdbx/tx.rs b/crates/storage/db/src/implementation/mdbx/tx.rs index a525f041d5..bfb1cd5d67 100644 --- a/crates/storage/db/src/implementation/mdbx/tx.rs +++ b/crates/storage/db/src/implementation/mdbx/tx.rs @@ -2,7 +2,7 @@ use super::cursor::Cursor; use crate::{ - table::{Compress, DupSort, Encode, Table, TableImporter}, + table::{Compress, DupSort, Encode, Table}, tables::utils::decode_one, transaction::{DbTx, DbTxGAT, DbTxMut, DbTxMutGAT}, Error, @@ -57,8 +57,6 @@ impl<'a, K: TransactionKind, E: EnvironmentKind> DbTxMutGAT<'a> for Tx<'_, K, E> type DupCursorMut = Cursor<'a, RW, T>; } -impl<'a, E: EnvironmentKind> TableImporter<'a> for Tx<'_, RW, E> {} - impl<'tx, K: TransactionKind, E: EnvironmentKind> DbTx<'tx> for Tx<'tx, K, E> { // Iterate over read only values in database. fn cursor_read(&self) -> Result<>::Cursor, Error> { @@ -77,10 +75,6 @@ impl<'tx, K: TransactionKind, E: EnvironmentKind> DbTx<'tx> for Tx<'tx, K, E> { result } - fn drop(self) { - drop(self.inner) - } - fn get(&self, key: T::Key) -> Result::Value>, Error> { self.inner .get( diff --git a/crates/storage/db/src/tables/models/blocks.rs b/crates/storage/db/src/tables/models/blocks.rs index a9f60ca222..8fa20190f6 100644 --- a/crates/storage/db/src/tables/models/blocks.rs +++ b/crates/storage/db/src/tables/models/blocks.rs @@ -97,12 +97,6 @@ impl From<(u64, H256)> for BlockNumHash { } } -impl From for BlockNumHash { - fn from(tpl: u64) -> Self { - BlockNumHash((tpl, H256::default())) - } -} - impl Encode for BlockNumHash { type Encoded = [u8; 40]; diff --git a/crates/storage/provider/src/transaction.rs b/crates/storage/provider/src/transaction.rs index d927957849..798b2c6fe6 100644 --- a/crates/storage/provider/src/transaction.rs +++ b/crates/storage/provider/src/transaction.rs @@ -88,17 +88,6 @@ where Ok(success) } - /// Drops the current inner transaction and open a new one. - pub fn drop(&mut self) -> Result<(), DbError> { - if let Some(tx) = self.tx.take() { - drop(tx); - } - - self.tx = Some(self.db.tx_mut()?); - - Ok(()) - } - /// Open a new inner transaction. pub fn open(&mut self) -> Result<(), DbError> { self.tx = Some(self.db.tx_mut()?);