diff --git a/Cargo.lock b/Cargo.lock index fbc34b79e4..033cf4ddd4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1003,7 +1003,7 @@ dependencies = [ [[package]] name = "discv5" version = "0.1.0" -source = "git+https://github.com/mattsse/discv5?branch=matt/add-mut-value#3fcc98dd2dd86d56d8b43f3556f5171abed02f6d" +source = "git+https://github.com/mattsse/discv5?branch=matt/add-mut-value#0a70cf9190aad45adebf6fb68a9d6573ce8a3be7" dependencies = [ "aes 0.7.5", "aes-gcm", diff --git a/crates/stages/src/stages/bodies.rs b/crates/stages/src/stages/bodies.rs index 74c88d9d5b..6befeb12fa 100644 --- a/crates/stages/src/stages/bodies.rs +++ b/crates/stages/src/stages/bodies.rs @@ -6,7 +6,7 @@ use futures_util::StreamExt; use reth_db::{ cursor::{DbCursorRO, DbCursorRW}, database::{Database, DatabaseGAT}, - models::{StoredBlockBody, StoredBlockOmmers}, + models::{BlockNumHash, StoredBlockBody, StoredBlockOmmers}, tables, transaction::{DbTx, DbTxMut}, }; @@ -16,7 +16,7 @@ use reth_interfaces::{ }; use reth_primitives::{BlockNumber, SealedHeader}; use std::{fmt::Debug, sync::Arc}; -use tracing::{error, warn}; +use tracing::*; const BODIES: StageId = StageId("Bodies"); @@ -80,7 +80,7 @@ impl Stage for BodyStage Result { let previous_stage_progress = input.previous_stage_progress(); if previous_stage_progress == 0 { - warn!("The body stage seems to be running first, no work can be completed."); + error!(target: "sync::stages::bodies", "The body stage is running first, no work can be done"); return Err(StageError::DatabaseIntegrity(DatabaseIntegrityError::BlockBody { number: 0, })) @@ -93,6 +93,7 @@ impl Stage for BodyStage Stage for BodyStage()?; let mut tx_transition_cursor = tx.cursor_mut::()?; - // Get id for the first transaction in the block + // Get id for the first transaction and first transition in the block let (mut current_tx_id, mut transition_id) = tx.get_next_block_ids(starting_block)?; // NOTE(onbjerg): The stream needs to live here otherwise it will just create a new iterator // on every iteration of the while loop -_- let mut bodies_stream = self.downloader.bodies_stream(bodies_to_download.iter()); let mut highest_block = stage_progress; + trace!(target: "sync::stages::bodies", stage_progress, target, start_tx_id = current_tx_id, transition_id, "Commencing sync"); while let Some(result) = bodies_stream.next().await { let Ok(response) = result else { - error!( - "Encountered an error downloading block {}: {:?}", - highest_block + 1, - result.unwrap_err() - ); + error!(target: "sync::stages::bodies", block = highest_block + 1, error = ?result.unwrap_err(), "Error downloading block"); return Ok(ExecOutput { stage_progress: highest_block, done: false, @@ -129,20 +127,21 @@ impl Stage for BodyStage { + trace!(target: "sync::stages::bodies", ommers = block.ommers.len(), txs = block.body.len(), ?numhash, "Writing full block"); + body_cursor.append( - block_key, + numhash, StoredBlockBody { start_tx_id: current_tx_id, tx_count: block.body.len() as u64, }, )?; ommers_cursor.append( - block_key, + numhash, StoredBlockOmmers { ommers: block .ommers @@ -164,8 +163,9 @@ impl Stage for BodyStage { + trace!(target: "sync::stages::bodies", ?numhash, "Writing empty block"); body_cursor.append( - block_key, + numhash, StoredBlockBody { start_tx_id: current_tx_id, tx_count: 0 }, )?; } @@ -175,12 +175,14 @@ impl Stage for BodyStage Stage for BodyStage Stage for ExecutionStage { // no more canonical blocks, we are done with execution. if canonical_batch.is_empty() { + info!(target: "sync::stages::execution", stage_progress = last_block, "Target block already reached"); return Ok(ExecOutput { stage_progress: last_block, done: true }) } @@ -131,8 +133,6 @@ impl Stage for ExecutionStage { // Fetch transactions, execute them and generate results let mut block_change_patches = Vec::with_capacity(canonical_batch.len()); for (header, body) in block_batch.iter() { - let num = header.number; - tracing::trace!(target: "stages::execution", ?num, "Execute block num."); // iterate over all transactions let mut tx_walker = tx_cursor.walk(body.start_tx_id)?; let mut transactions = Vec::with_capacity(body.tx_count as usize); @@ -141,6 +141,7 @@ impl Stage for ExecutionStage { let (tx_index, tx) = tx_walker.next().ok_or(DatabaseIntegrityError::EndOfTransactionTable)??; if tx_index != index { + error!(target: "sync::stages::execution", block = header.number, expected = index, found = tx_index, ?body, "Transaction gap"); return Err(DatabaseIntegrityError::TransactionsGap { missing: tx_index }.into()) } transactions.push(tx); @@ -154,6 +155,7 @@ impl Stage for ExecutionStage { .next() .ok_or(DatabaseIntegrityError::EndOfTransactionSenderTable)??; if tx_index != index { + error!(target: "sync::stages::execution", block = header.number, expected = index, found = tx_index, ?body, "Signer gap"); return Err( DatabaseIntegrityError::TransactionsSignerGap { missing: tx_index }.into() ) @@ -172,6 +174,7 @@ impl Stage for ExecutionStage { // for now use default eth config let state_provider = SubState::new(State::new(StateProviderImplRefLatest::new(&**tx))); + trace!(target: "sync::stages::execution", number = header.number, txs = recovered_transactions.len(), "Executing block"); let change_set = std::thread::scope(|scope| { let handle = std::thread::Builder::new() .stack_size(50 * 1024 * 1024) @@ -195,6 +198,7 @@ impl Stage for ExecutionStage { // Get last tx count so that we can know amount of transaction in the block. let mut current_transition_id = tx.get_block_transition_by_num(last_block)? + 1; + info!(target: "sync::stages::execution", current_transition_id, blocks = block_change_patches.len(), "Inserting execution results"); // apply changes to plain database. for results in block_change_patches.into_iter() { @@ -205,6 +209,7 @@ impl Stage for ExecutionStage { let AccountChangeSet { account, wipe_storage, storage } = account_change_set; // apply account change to db. Updates AccountChangeSet and PlainAccountState // tables. + trace!(target: "sync::stages::execution", ?address, current_transition_id, ?account, wipe_storage, "Applying account changeset"); account.apply_to_db(&**tx, address, current_transition_id)?; // wipe storage @@ -218,6 +223,8 @@ impl Stage for ExecutionStage { let mut hkey = H256::zero(); key.to_big_endian(&mut hkey.0); + trace!(target: "sync::stages::execution", ?address, current_transition_id, ?hkey, ?old_value, ?new_value, "Applying storage changeset"); + // insert into StorageChangeSet tx.put::( storage_id.clone(), @@ -242,7 +249,9 @@ impl Stage for ExecutionStage { for (hash, bytecode) in result.new_bytecodes.into_iter() { // make different types of bytecode. Checked and maybe even analyzed (needs to // be packed). Currently save only raw bytes. - tx.put::(hash, bytecode.bytes()[..bytecode.len()].to_vec())?; + let bytecode = bytecode.bytes(); + trace!(target: "sync::stages::execution", ?hash, ?bytecode, len = bytecode.len(), "Inserting bytecode"); + tx.put::(hash, bytecode[..bytecode.len()].to_vec())?; // NOTE: bytecode bytes are not inserted in change set and it stand in saparate // table @@ -254,15 +263,17 @@ impl Stage for ExecutionStage { if let Some(block_reward_changeset) = results.block_reward { // we are sure that block reward index is present. for (address, changeset) in block_reward_changeset.into_iter() { + trace!(target: "sync::stages::execution", ?address, current_transition_id, "Applying block reward"); changeset.apply_to_db(&**tx, address, current_transition_id)?; } current_transition_id += 1; } } - let last_block = last_block + canonical_batch.len() as u64; - let is_done = canonical_batch.len() < BATCH_SIZE as usize; - Ok(ExecOutput { done: is_done, stage_progress: last_block }) + let stage_progress = last_block + canonical_batch.len() as u64; + let done = canonical_batch.len() < BATCH_SIZE as usize; + info!(target: "sync::stages::execution", done, stage_progress, "Sync iteration finished"); + Ok(ExecOutput { done, stage_progress }) } /// Unwind the stage. diff --git a/crates/stages/src/stages/headers.rs b/crates/stages/src/stages/headers.rs index 796d959669..070702954e 100644 --- a/crates/stages/src/stages/headers.rs +++ b/crates/stages/src/stages/headers.rs @@ -76,12 +76,7 @@ impl, _>>() { Ok(res) => { - info!( - target: "sync::stages::headers", - len = res.len(), - "Received headers" - ); + info!(target: "sync::stages::headers", len = res.len(), "Received headers"); // Perform basic response validation self.validate_header_response(&res)?; @@ -107,25 +98,15 @@ impl match e { DownloadError::Timeout => { - warn!( - target: "sync::stages::headers", - "No response for header request" - ); + warn!(target: "sync::stages::headers", "No response for header request"); return Err(StageError::Recoverable(DownloadError::Timeout.into())) } DownloadError::HeaderValidation { hash, error } => { - error!( - target: "sync::stages::headers", - "Validation error for header {hash}: {error}" - ); + error!(target: "sync::stages::headers", ?error, ?hash, "Validation error"); return Err(StageError::Validation { block: stage_progress, error }) } error => { - error!( - target: "sync::stages::headers", - ?error, - "An unexpected error occurred" - ); + error!(target: "sync::stages::headers", ?error, "Unexpected error"); return Err(StageError::Recoverable(error.into())) } }, @@ -133,6 +114,7 @@ impl(tx, &head)?; let stage_progress = current_progress.max( diff --git a/crates/stages/src/stages/senders.rs b/crates/stages/src/stages/senders.rs index 1d7163cf6f..94920d03ba 100644 --- a/crates/stages/src/stages/senders.rs +++ b/crates/stages/src/stages/senders.rs @@ -13,6 +13,7 @@ use reth_db::{ use reth_primitives::TxNumber; use std::fmt::Debug; use thiserror::Error; +use tracing::*; const SENDERS: StageId = StageId("Senders"); @@ -63,6 +64,7 @@ impl Stage for SendersStage { let max_block_num = previous_stage_progress.min(stage_progress + self.commit_threshold); if max_block_num <= stage_progress { + info!(target: "sync::stages::senders", target = max_block_num, stage_progress, "Target block already reached"); return Ok(ExecOutput { stage_progress, done: true }) } @@ -74,6 +76,7 @@ impl Stage for SendersStage { // No transactions to walk over if start_tx_index > end_tx_index { + info!(target: "sync::stages::senders", start_tx_index, end_tx_index, "Target transaction already reached"); return Ok(ExecOutput { stage_progress: max_block_num, done: true }) } @@ -88,17 +91,19 @@ impl Stage for SendersStage { .take_while(|res| res.as_ref().map(|(k, _)| *k <= end_tx_index).unwrap_or_default()); // Iterate over transactions in chunks + info!(target: "sync::stages::senders", start_tx_index, end_tx_index, "Recovering senders"); for chunk in &entries.chunks(self.batch_size) { let transactions = chunk.collect::, DbError>>()?; // Recover signers for the chunk in parallel let recovered = transactions .into_par_iter() - .map(|(id, transaction)| { + .map(|(tx_id, transaction)| { + trace!(target: "sync::stages::senders", tx_id, hash = ?transaction.hash(), "Recovering sender"); let signer = transaction.recover_signer().ok_or_else::(|| { - SendersStageError::SenderRecovery { tx: id }.into() + SendersStageError::SenderRecovery { tx: tx_id }.into() })?; - Ok((id, signer)) + Ok((tx_id, signer)) }) .collect::, StageError>>()?; // Append the signers to the table @@ -106,6 +111,7 @@ impl Stage for SendersStage { } let done = max_block_num >= previous_stage_progress; + info!(target: "sync::stages::senders", stage_progress = max_block_num, done, "Sync iteration finished"); Ok(ExecOutput { stage_progress: max_block_num, done }) } diff --git a/crates/storage/db/src/tables/models/blocks.rs b/crates/storage/db/src/tables/models/blocks.rs index 84bb47f47e..cfae7a66fd 100644 --- a/crates/storage/db/src/tables/models/blocks.rs +++ b/crates/storage/db/src/tables/models/blocks.rs @@ -65,9 +65,15 @@ pub type HeaderHash = H256; /// element as BlockNumber, helps out with querying/sorting. /// /// Since it's used as a key, the `BlockNumber` is not compressed when encoding it. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] +#[derive(Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] pub struct BlockNumHash(pub (BlockNumber, BlockHash)); +impl std::fmt::Debug for BlockNumHash { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_tuple("").field(&self.0 .0).field(&self.0 .1).finish() + } +} + impl BlockNumHash { /// Consumes `Self` and returns [`BlockNumber`], [`BlockHash`] pub fn take(self) -> (BlockNumber, BlockHash) {