From 313bf28501bd994e0965b7fa758496e872699f0e Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Wed, 15 Feb 2023 01:23:33 +0800 Subject: [PATCH] feat: add `reth dump-stage` command (#1328) --- bin/reth/src/cli.rs | 6 +- bin/reth/src/db/mod.rs | 4 +- bin/reth/src/dump_stage/execution.rs | 145 ++++++++++++++++++ bin/reth/src/dump_stage/hashing_storage.rs | 85 ++++++++++ bin/reth/src/dump_stage/mod.rs | 123 +++++++++++++++ bin/reth/src/lib.rs | 1 + crates/stages/src/stages/execution.rs | 6 +- crates/storage/db/src/abstraction/database.rs | 3 +- crates/storage/db/src/abstraction/mock.rs | 8 +- crates/storage/db/src/abstraction/table.rs | 60 +++++++- .../storage/db/src/abstraction/transaction.rs | 2 + .../storage/db/src/implementation/mdbx/tx.rs | 8 +- crates/storage/db/src/tables/models/blocks.rs | 6 + crates/storage/provider/src/transaction.rs | 11 ++ 14 files changed, 459 insertions(+), 9 deletions(-) create mode 100644 bin/reth/src/dump_stage/execution.rs create mode 100644 bin/reth/src/dump_stage/hashing_storage.rs create mode 100644 bin/reth/src/dump_stage/mod.rs diff --git a/bin/reth/src/cli.rs b/bin/reth/src/cli.rs index ed9d0f9d5b..e984a0b937 100644 --- a/bin/reth/src/cli.rs +++ b/bin/reth/src/cli.rs @@ -4,7 +4,7 @@ use std::str::FromStr; use crate::{ chain, db, dirs::{LogsDir, PlatformPath}, - node, p2p, + dump_stage, node, p2p, runner::CliRunner, stage, test_eth_chain, test_vectors, }; @@ -30,6 +30,7 @@ pub fn run() -> eyre::Result<()> { Commands::Import(command) => runner.run_until_ctrl_c(command.execute()), Commands::Db(command) => runner.run_until_ctrl_c(command.execute()), Commands::Stage(command) => runner.run_until_ctrl_c(command.execute()), + Commands::DumpStage(command) => runner.run_until_ctrl_c(command.execute()), Commands::P2P(command) => runner.run_until_ctrl_c(command.execute()), Commands::TestVectors(command) => runner.run_until_ctrl_c(command.execute()), Commands::TestEthChain(command) => runner.run_until_ctrl_c(command.execute()), @@ -59,6 +60,9 @@ pub enum Commands { /// a lot of memory to store all the data. #[command(name = "stage")] Stage(stage::Command), + /// Dumps a stage from a range into a new database. + #[command(name = "dump-stage")] + DumpStage(dump_stage::Command), /// P2P Debugging utilities #[command(name = "p2p")] P2P(p2p::Command), diff --git a/bin/reth/src/db/mod.rs b/bin/reth/src/db/mod.rs index 506362d75b..4e215ef22e 100644 --- a/bin/reth/src/db/mod.rs +++ b/bin/reth/src/db/mod.rs @@ -189,13 +189,13 @@ impl Command { } /// Wrapper over DB that implements many useful DB queries. -struct DbTool<'a, DB: Database> { +pub(crate) struct DbTool<'a, DB: Database> { pub(crate) db: &'a DB, } impl<'a, DB: Database> DbTool<'a, DB> { /// Takes a DB where the tables have already been created. - fn new(db: &'a DB) -> eyre::Result { + pub(crate) fn new(db: &'a DB) -> eyre::Result { Ok(Self { db }) } diff --git a/bin/reth/src/dump_stage/execution.rs b/bin/reth/src/dump_stage/execution.rs new file mode 100644 index 0000000000..49d6de7de3 --- /dev/null +++ b/bin/reth/src/dump_stage/execution.rs @@ -0,0 +1,145 @@ +use crate::{ + db::DbTool, + dirs::{DbPath, PlatformPath}, + dump_stage::setup, +}; +use eyre::Result; +use reth_db::{ + cursor::DbCursorRO, database::Database, table::TableImporter, tables, transaction::DbTx, +}; +use reth_provider::Transaction; +use reth_stages::{stages::ExecutionStage, Stage, StageId, UnwindInput}; +use std::ops::DerefMut; +use tracing::info; + +pub(crate) async fn dump_execution_stage( + db_tool: &mut DbTool<'_, DB>, + from: u64, + to: u64, + output_db: &PlatformPath, + should_run: bool, +) -> Result<()> { + let (output_db, tip_block_number) = setup::(from, to, output_db, db_tool)?; + + import_tables_with_range::(&output_db, db_tool, from, to)?; + + unwind_and_copy::(db_tool, from, tip_block_number, &output_db).await?; + + if should_run { + dry_run(output_db, to, from).await?; + } + + Ok(()) +} + +/// Imports all the tables that can be copied over a range. +fn import_tables_with_range( + output_db: &reth_db::mdbx::Env, + db_tool: &mut DbTool<'_, DB>, + from: u64, + to: u64, +) -> eyre::Result<()> { + // We're not sharing the transaction in case the memory grows too much. + + output_db.update(|tx| { + tx.import_table_with_range::(&db_tool.db.tx()?, Some(from), to) + })??; + output_db.update(|tx| { + tx.import_table_with_range::(&db_tool.db.tx()?, Some(from), to) + })??; + output_db.update(|tx| { + tx.import_table_with_range::(&db_tool.db.tx()?, Some(from), to) + })??; + output_db.update(|tx| { + tx.import_table_with_range::(&db_tool.db.tx()?, Some(from), to) + })??; + output_db.update(|tx| { + tx.import_table_with_range::(&db_tool.db.tx()?, Some(from), to) + })??; + + // Find range of transactions that need to be copied over + let (from_tx, to_tx) = db_tool.db.view(|read_tx| { + let mut read_cursor = read_tx.cursor_read::()?; + let (_, from_block) = + read_cursor.seek(from)?.ok_or(eyre::eyre!("BlockBody {from} does not exist."))?; + let (_, to_block) = + read_cursor.seek(to)?.ok_or(eyre::eyre!("BlockBody {to} does not exist."))?; + + Ok::<(u64, u64), eyre::ErrReport>(( + from_block.start_tx_id, + to_block.start_tx_id + to_block.tx_count, + )) + })??; + + output_db.update(|tx| { + tx.import_table_with_range::( + &db_tool.db.tx()?, + Some(from_tx), + to_tx, + ) + })??; + + output_db.update(|tx| { + tx.import_table_with_range::(&db_tool.db.tx()?, Some(from_tx), to_tx) + })??; + + Ok(()) +} + +/// Dry-run an unwind to FROM block, so we can get the PlainStorageState and +/// PlainAccountState safely. There might be some state dependency from an address +/// which hasn't been changed in the given range. +async fn unwind_and_copy( + db_tool: &mut DbTool<'_, DB>, + from: u64, + tip_block_number: u64, + output_db: &reth_db::mdbx::Env, +) -> eyre::Result<()> { + let mut unwind_tx = Transaction::new(db_tool.db)?; + let mut exec_stage = ExecutionStage::default(); + + exec_stage + .unwind( + &mut unwind_tx, + UnwindInput { unwind_to: from, stage_progress: tip_block_number, bad_block: None }, + ) + .await?; + + let unwind_inner_tx = unwind_tx.deref_mut(); + + output_db.update(|tx| tx.import_dupsort::(unwind_inner_tx))??; + output_db.update(|tx| tx.import_table::(unwind_inner_tx))??; + output_db.update(|tx| tx.import_table::(unwind_inner_tx))??; + + unwind_tx.drop()?; + + Ok(()) +} + +/// Try to re-execute the stage without committing +async fn dry_run( + output_db: reth_db::mdbx::Env, + to: u64, + from: u64, +) -> eyre::Result<()> { + info!(target: "reth::cli", "Executing stage. [dry-run]"); + + let mut tx = Transaction::new(&output_db)?; + let mut exec_stage = ExecutionStage::default(); + + exec_stage + .execute( + &mut tx, + reth_stages::ExecInput { + previous_stage: Some((StageId("Another"), to)), + stage_progress: Some(from), + }, + ) + .await?; + + tx.drop()?; + + info!(target: "reth::cli", "Success."); + + Ok(()) +} diff --git a/bin/reth/src/dump_stage/hashing_storage.rs b/bin/reth/src/dump_stage/hashing_storage.rs new file mode 100644 index 0000000000..adcd292de5 --- /dev/null +++ b/bin/reth/src/dump_stage/hashing_storage.rs @@ -0,0 +1,85 @@ +use crate::{ + db::DbTool, + dirs::{DbPath, PlatformPath}, + dump_stage::setup, +}; +use eyre::Result; +use reth_db::{database::Database, table::TableImporter, tables}; +use reth_provider::Transaction; +use reth_stages::{stages::StorageHashingStage, Stage, StageId, UnwindInput}; +use std::ops::DerefMut; +use tracing::info; + +pub(crate) async fn dump_hashing_storage_stage( + db_tool: &mut DbTool<'_, DB>, + from: u64, + to: u64, + output_db: &PlatformPath, + should_run: bool, +) -> Result<()> { + let (output_db, tip_block_number) = setup::(from, to, output_db, db_tool)?; + + unwind_and_copy::(db_tool, from, tip_block_number, &output_db).await?; + + // Try to re-execute the stage without committing + if should_run { + dry_run(output_db, to, from).await?; + } + + Ok(()) +} + +/// Dry-run an unwind to FROM block and copy the necessary table data to the new database. +async fn unwind_and_copy( + db_tool: &mut DbTool<'_, DB>, + from: u64, + tip_block_number: u64, + output_db: &reth_db::mdbx::Env, +) -> eyre::Result<()> { + let mut unwind_tx = Transaction::new(db_tool.db)?; + let mut exec_stage = StorageHashingStage::default(); + + exec_stage + .unwind( + &mut unwind_tx, + UnwindInput { unwind_to: from, stage_progress: tip_block_number, bad_block: None }, + ) + .await?; + let unwind_inner_tx = unwind_tx.deref_mut(); + + // TODO optimize we can actually just get the entries we need for both these tables + output_db.update(|tx| tx.import_dupsort::(unwind_inner_tx))??; + output_db.update(|tx| tx.import_dupsort::(unwind_inner_tx))??; + + unwind_tx.drop()?; + + Ok(()) +} + +/// Try to re-execute the stage without committing +async fn dry_run( + output_db: reth_db::mdbx::Env, + to: u64, + from: u64, +) -> eyre::Result<()> { + info!(target: "reth::cli", "Executing stage. [dry-run]"); + + let mut tx = Transaction::new(&output_db)?; + let mut stage = StorageHashingStage { clean_threshold: 1, ..Default::default() }; + + stage + .execute( + &mut tx, + reth_stages::ExecInput { + previous_stage: Some((StageId("Another"), to)), + stage_progress: Some(from), + }, + ) + .await?; + + tx.drop()?; + + info!(target: "reth::cli", "Success."); + + Ok(()) +} diff --git a/bin/reth/src/dump_stage/mod.rs b/bin/reth/src/dump_stage/mod.rs new file mode 100644 index 0000000000..78cac15ce3 --- /dev/null +++ b/bin/reth/src/dump_stage/mod.rs @@ -0,0 +1,123 @@ +//! Database debugging tool +mod hashing_storage; +use hashing_storage::dump_hashing_storage_stage; + +mod execution; +use execution::dump_execution_stage; + +use crate::{ + db::DbTool, + dirs::{DbPath, PlatformPath}, +}; +use clap::Parser; +use reth_db::{ + cursor::DbCursorRO, database::Database, table::TableImporter, tables, transaction::DbTx, +}; +use reth_staged_sync::utils::init::init_db; +use tracing::info; + +/// `reth dump-stage` command +#[derive(Debug, Parser)] +pub struct Command { + /// The path to the database folder. + /// + /// Defaults to the OS-specific data directory: + /// + /// - Linux: `$XDG_DATA_HOME/reth/db` or `$HOME/.local/share/reth/db` + /// - Windows: `{FOLDERID_RoamingAppData}/reth/db` + /// - macOS: `$HOME/Library/Application Support/reth/db` + #[arg(long, value_name = "PATH", verbatim_doc_comment, default_value_t)] + db: PlatformPath, + + #[clap(subcommand)] + command: Stages, +} + +/// Supported stages to be dumped +#[derive(Debug, Clone, Parser)] +pub enum Stages { + /// Execution stage. + Execution(StageCommand), + /// StorageHashing stage. + StorageHashing(StageCommand), +} + +/// Stage command that takes a range +#[derive(Debug, Clone, Parser)] +pub struct StageCommand { + /// The path to the new database folder. + /// + /// Defaults to the OS-specific data directory: + /// + /// - Linux: `$XDG_DATA_HOME/reth/db` or `$HOME/.local/share/reth/db` + /// - Windows: `{FOLDERID_RoamingAppData}/reth/db` + /// - macOS: `$HOME/Library/Application Support/reth/db` + #[arg(long, value_name = "OUTPUT_PATH", verbatim_doc_comment, default_value_t)] + output_db: PlatformPath, + /// From which block. + #[arg(long, short)] + from: u64, + /// To which block. + #[arg(long, short)] + to: u64, + /// If passed, it will dry-run a stage execution from the newly created database right after + /// dumping. + #[arg(long, short, default_value = "false")] + dry_run: bool, +} + +impl Command { + /// Execute `dump-stage` command + pub async fn execute(&self) -> eyre::Result<()> { + std::fs::create_dir_all(&self.db)?; + + // TODO: Auto-impl for Database trait + let db = reth_db::mdbx::Env::::open( + self.db.as_ref(), + reth_db::mdbx::EnvKind::RW, + )?; + + let mut tool = DbTool::new(&db)?; + + match &self.command { + Stages::Execution(StageCommand { output_db, from, to, dry_run, .. }) => { + dump_execution_stage(&mut tool, *from, *to, output_db, *dry_run).await? + } + Stages::StorageHashing(StageCommand { output_db, from, to, dry_run, .. }) => { + dump_hashing_storage_stage(&mut tool, *from, *to, output_db, *dry_run).await? + } + } + + Ok(()) + } +} + +/// Sets up the database and initial state on `BlockTransitionIndex`. Also returns the tip block +/// number. +pub(crate) fn setup( + from: u64, + to: u64, + output_db: &PlatformPath, + db_tool: &mut DbTool<'_, DB>, +) -> eyre::Result<(reth_db::mdbx::Env, u64)> { + assert!(from < to, "FROM block should be bigger than TO block."); + + info!(target: "reth::cli", "Creating separate db at {}", output_db); + + let output_db = init_db(output_db)?; + + output_db.update(|tx| { + tx.import_table_with_range::( + &db_tool.db.tx()?, + Some(from - 1), + to + 1, + ) + })??; + + let (tip_block_number, _) = db_tool + .db + .view(|tx| tx.cursor_read::()?.last())?? + .expect("some"); + + Ok((output_db, tip_block_number)) +} diff --git a/bin/reth/src/lib.rs b/bin/reth/src/lib.rs index 244d5ef62f..79faa46225 100644 --- a/bin/reth/src/lib.rs +++ b/bin/reth/src/lib.rs @@ -12,6 +12,7 @@ pub mod chain; pub mod cli; pub mod db; pub mod dirs; +pub mod dump_stage; pub mod node; pub mod p2p; pub mod prometheus_exporter; diff --git a/crates/stages/src/stages/execution.rs b/crates/stages/src/stages/execution.rs index 56a8637733..aca8c6340e 100644 --- a/crates/stages/src/stages/execution.rs +++ b/crates/stages/src/stages/execution.rs @@ -360,8 +360,10 @@ impl Stage for ExecutionStage { for (key, storage) in storage_changeset_batch.into_iter().rev() { let address = key.address(); - if plain_storage_cursor.seek_by_key_subkey(address, storage.key)?.is_some() { - plain_storage_cursor.delete_current()?; + if let Some(v) = plain_storage_cursor.seek_by_key_subkey(address, storage.key)? { + if v.key == storage.key { + plain_storage_cursor.delete_current()?; + } } if storage.value != U256::ZERO { plain_storage_cursor.upsert(address, storage)?; diff --git a/crates/storage/db/src/abstraction/database.rs b/crates/storage/db/src/abstraction/database.rs index 10b6d652e5..6a8a9aac67 100644 --- a/crates/storage/db/src/abstraction/database.rs +++ b/crates/storage/db/src/abstraction/database.rs @@ -1,5 +1,6 @@ use crate::{ common::{Bounds, Sealed}, + table::TableImporter, transaction::{DbTx, DbTxMut}, Error, }; @@ -13,7 +14,7 @@ pub trait DatabaseGAT<'a, __ImplicitBounds: Sealed = Bounds<&'a Self>>: Send + S /// RO database transaction type TX: DbTx<'a> + Send + Sync; /// RW database transaction - type TXMut: DbTxMut<'a> + DbTx<'a> + Send + Sync; + type TXMut: DbTxMut<'a> + DbTx<'a> + TableImporter<'a> + Send + Sync; } /// Main Database trait that spawns transactions to be executed. diff --git a/crates/storage/db/src/abstraction/mock.rs b/crates/storage/db/src/abstraction/mock.rs index 8974a22790..3160a5c54b 100644 --- a/crates/storage/db/src/abstraction/mock.rs +++ b/crates/storage/db/src/abstraction/mock.rs @@ -8,7 +8,7 @@ use crate::{ ReverseWalker, Walker, }, database::{Database, DatabaseGAT}, - table::{DupSort, Table}, + table::{DupSort, Table, TableImporter}, transaction::{DbTx, DbTxGAT, DbTxMut, DbTxMutGAT}, Error, }; @@ -63,6 +63,10 @@ impl<'a> DbTx<'a> for TxMock { todo!() } + fn drop(self) { + todo!() + } + fn cursor_read(&self) -> Result<>::Cursor, Error> { todo!() } @@ -96,6 +100,8 @@ impl<'a> DbTxMut<'a> for TxMock { } } +impl<'a> TableImporter<'a> for TxMock {} + /// CUrsor that iterates over table pub struct CursorMock { _cursor: u32, diff --git a/crates/storage/db/src/abstraction/table.rs b/crates/storage/db/src/abstraction/table.rs index fb1583a47f..691d12db27 100644 --- a/crates/storage/db/src/abstraction/table.rs +++ b/crates/storage/db/src/abstraction/table.rs @@ -1,4 +1,8 @@ -use crate::Error; +use crate::{ + abstraction::cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW}, + transaction::{DbTx, DbTxMut}, + Error, +}; use bytes::Bytes; use serde::Serialize; use std::{ @@ -76,3 +80,57 @@ pub trait DupSort: Table { /// Upstream docs: type SubKey: Key; } + +/// Allows duplicating tables across databases +pub trait TableImporter<'tx>: for<'a> DbTxMut<'a> { + /// Imports all table data from another transaction. + fn import_table>(&self, source_tx: &R) -> Result<(), Error> { + let mut destination_cursor = self.cursor_write::()?; + + for kv in source_tx.cursor_read::()?.walk(None)? { + let (k, v) = kv?; + destination_cursor.append(k, v)?; + } + + Ok(()) + } + + /// Imports table data from another transaction within a range. + fn import_table_with_range>( + &self, + source_tx: &R, + from: Option<::Key>, + to: ::Key, + ) -> Result<(), Error> { + let mut destination_cursor = self.cursor_write::()?; + let mut source_cursor = source_tx.cursor_read::()?; + + for row in source_cursor.walk(from)? { + let (key, value) = row?; + let finished = key == to; + + destination_cursor.append(key, value)?; + + if finished { + break + } + } + + Ok(()) + } + + /// Imports all dupsort data from another transaction. + fn import_dupsort>(&self, source_tx: &R) -> Result<(), Error> { + let mut destination_cursor = self.cursor_dup_write::()?; + let mut cursor = source_tx.cursor_dup_read::()?; + + while let Some((k, _)) = cursor.next_no_dup()? { + for kv in cursor.walk_dup(Some(k), None)? { + let (k, v) = kv?; + destination_cursor.append_dup(k, v)?; + } + } + + Ok(()) + } +} diff --git a/crates/storage/db/src/abstraction/transaction.rs b/crates/storage/db/src/abstraction/transaction.rs index 81798cb08a..832a5e926e 100644 --- a/crates/storage/db/src/abstraction/transaction.rs +++ b/crates/storage/db/src/abstraction/transaction.rs @@ -39,6 +39,8 @@ pub trait DbTx<'tx>: for<'a> DbTxGAT<'a> { /// Commit for read only transaction will consume and free transaction and allows /// freeing of memory pages fn commit(self) -> Result; + /// Drops transaction + fn drop(self); /// Iterate over read only values in table. fn cursor_read(&self) -> Result<>::Cursor, Error>; /// Iterate over read only values in dup sorted table. diff --git a/crates/storage/db/src/implementation/mdbx/tx.rs b/crates/storage/db/src/implementation/mdbx/tx.rs index bfb1cd5d67..a525f041d5 100644 --- a/crates/storage/db/src/implementation/mdbx/tx.rs +++ b/crates/storage/db/src/implementation/mdbx/tx.rs @@ -2,7 +2,7 @@ use super::cursor::Cursor; use crate::{ - table::{Compress, DupSort, Encode, Table}, + table::{Compress, DupSort, Encode, Table, TableImporter}, tables::utils::decode_one, transaction::{DbTx, DbTxGAT, DbTxMut, DbTxMutGAT}, Error, @@ -57,6 +57,8 @@ impl<'a, K: TransactionKind, E: EnvironmentKind> DbTxMutGAT<'a> for Tx<'_, K, E> type DupCursorMut = Cursor<'a, RW, T>; } +impl<'a, E: EnvironmentKind> TableImporter<'a> for Tx<'_, RW, E> {} + impl<'tx, K: TransactionKind, E: EnvironmentKind> DbTx<'tx> for Tx<'tx, K, E> { // Iterate over read only values in database. fn cursor_read(&self) -> Result<>::Cursor, Error> { @@ -75,6 +77,10 @@ impl<'tx, K: TransactionKind, E: EnvironmentKind> DbTx<'tx> for Tx<'tx, K, E> { result } + fn drop(self) { + drop(self.inner) + } + fn get(&self, key: T::Key) -> Result::Value>, Error> { self.inner .get( diff --git a/crates/storage/db/src/tables/models/blocks.rs b/crates/storage/db/src/tables/models/blocks.rs index 8fa20190f6..a9f60ca222 100644 --- a/crates/storage/db/src/tables/models/blocks.rs +++ b/crates/storage/db/src/tables/models/blocks.rs @@ -97,6 +97,12 @@ impl From<(u64, H256)> for BlockNumHash { } } +impl From for BlockNumHash { + fn from(tpl: u64) -> Self { + BlockNumHash((tpl, H256::default())) + } +} + impl Encode for BlockNumHash { type Encoded = [u8; 40]; diff --git a/crates/storage/provider/src/transaction.rs b/crates/storage/provider/src/transaction.rs index 747121cffb..c5e60053f6 100644 --- a/crates/storage/provider/src/transaction.rs +++ b/crates/storage/provider/src/transaction.rs @@ -88,6 +88,17 @@ where Ok(success) } + /// Drops the current inner transaction and open a new one. + pub fn drop(&mut self) -> Result<(), DbError> { + if let Some(tx) = self.tx.take() { + drop(tx); + } + + self.tx = Some(self.db.tx_mut()?); + + Ok(()) + } + /// Open a new inner transaction. pub fn open(&mut self) -> Result<(), DbError> { self.tx = Some(self.db.tx_mut()?);