meta(sync): tracing (#455)

* meta(sync): tracing

* Apply suggestions from code review

Co-authored-by: Bjerg <onbjerg@users.noreply.github.com>

* apply comments

* upd discv5 dep

Co-authored-by: Bjerg <onbjerg@users.noreply.github.com>
This commit is contained in:
Roman Krasiuk
2022-12-20 13:00:59 +02:00
committed by GitHub
parent c68a850c70
commit 0995ac2777
6 changed files with 60 additions and 52 deletions

2
Cargo.lock generated
View File

@@ -1003,7 +1003,7 @@ dependencies = [
[[package]]
name = "discv5"
version = "0.1.0"
source = "git+https://github.com/mattsse/discv5?branch=matt/add-mut-value#3fcc98dd2dd86d56d8b43f3556f5171abed02f6d"
source = "git+https://github.com/mattsse/discv5?branch=matt/add-mut-value#0a70cf9190aad45adebf6fb68a9d6573ce8a3be7"
dependencies = [
"aes 0.7.5",
"aes-gcm",

View File

@@ -6,7 +6,7 @@ use futures_util::StreamExt;
use reth_db::{
cursor::{DbCursorRO, DbCursorRW},
database::{Database, DatabaseGAT},
models::{StoredBlockBody, StoredBlockOmmers},
models::{BlockNumHash, StoredBlockBody, StoredBlockOmmers},
tables,
transaction::{DbTx, DbTxMut},
};
@@ -16,7 +16,7 @@ use reth_interfaces::{
};
use reth_primitives::{BlockNumber, SealedHeader};
use std::{fmt::Debug, sync::Arc};
use tracing::{error, warn};
use tracing::*;
const BODIES: StageId = StageId("Bodies");
@@ -80,7 +80,7 @@ impl<DB: Database, D: BodyDownloader, C: Consensus> Stage<DB> for BodyStage<D, C
) -> Result<ExecOutput, StageError> {
let previous_stage_progress = input.previous_stage_progress();
if previous_stage_progress == 0 {
warn!("The body stage seems to be running first, no work can be completed.");
error!(target: "sync::stages::bodies", "The body stage is running first, no work can be done");
return Err(StageError::DatabaseIntegrity(DatabaseIntegrityError::BlockBody {
number: 0,
}))
@@ -93,6 +93,7 @@ impl<DB: Database, D: BodyDownloader, C: Consensus> Stage<DB> for BodyStage<D, C
// Short circuit in case we already reached the target block
let target = previous_stage_progress.min(starting_block + self.commit_threshold);
if target <= stage_progress {
info!(target: "sync::stages::bodies", stage_progress, target, "Target block already reached");
return Ok(ExecOutput { stage_progress, done: true })
}
@@ -107,20 +108,17 @@ impl<DB: Database, D: BodyDownloader, C: Consensus> Stage<DB> for BodyStage<D, C
let mut block_transition_cursor = tx.cursor_mut::<tables::BlockTransitionIndex>()?;
let mut tx_transition_cursor = tx.cursor_mut::<tables::TxTransitionIndex>()?;
// Get id for the first transaction in the block
// Get id for the first transaction and first transition in the block
let (mut current_tx_id, mut transition_id) = tx.get_next_block_ids(starting_block)?;
// NOTE(onbjerg): The stream needs to live here otherwise it will just create a new iterator
// on every iteration of the while loop -_-
let mut bodies_stream = self.downloader.bodies_stream(bodies_to_download.iter());
let mut highest_block = stage_progress;
trace!(target: "sync::stages::bodies", stage_progress, target, start_tx_id = current_tx_id, transition_id, "Commencing sync");
while let Some(result) = bodies_stream.next().await {
let Ok(response) = result else {
error!(
"Encountered an error downloading block {}: {:?}",
highest_block + 1,
result.unwrap_err()
);
error!(target: "sync::stages::bodies", block = highest_block + 1, error = ?result.unwrap_err(), "Error downloading block");
return Ok(ExecOutput {
stage_progress: highest_block,
done: false,
@@ -129,20 +127,21 @@ impl<DB: Database, D: BodyDownloader, C: Consensus> Stage<DB> for BodyStage<D, C
// Write block
let block_header = response.header();
let block_number = block_header.number;
let block_key = block_header.num_hash().into();
let numhash: BlockNumHash = block_header.num_hash().into();
match response {
BlockResponse::Full(block) => {
trace!(target: "sync::stages::bodies", ommers = block.ommers.len(), txs = block.body.len(), ?numhash, "Writing full block");
body_cursor.append(
block_key,
numhash,
StoredBlockBody {
start_tx_id: current_tx_id,
tx_count: block.body.len() as u64,
},
)?;
ommers_cursor.append(
block_key,
numhash,
StoredBlockOmmers {
ommers: block
.ommers
@@ -164,8 +163,9 @@ impl<DB: Database, D: BodyDownloader, C: Consensus> Stage<DB> for BodyStage<D, C
}
}
BlockResponse::Empty(_) => {
trace!(target: "sync::stages::bodies", ?numhash, "Writing empty block");
body_cursor.append(
block_key,
numhash,
StoredBlockBody { start_tx_id: current_tx_id, tx_count: 0 },
)?;
}
@@ -175,12 +175,14 @@ impl<DB: Database, D: BodyDownloader, C: Consensus> Stage<DB> for BodyStage<D, C
// Increment the transition if the block contains an addition block reward.
// If the block does not have a reward, the transition will be the same as the
// transition at the last transaction of this block.
if self.consensus.has_block_reward(block_number) {
let has_reward = self.consensus.has_block_reward(numhash.number());
trace!(target: "sync::stages::bodies", has_reward, ?numhash, "Block reward");
if has_reward {
transition_id += 1;
}
block_transition_cursor.append(block_key, transition_id)?;
block_transition_cursor.append(numhash, transition_id)?;
highest_block = block_number;
highest_block = numhash.number();
}
// The stage is "done" if:
@@ -188,6 +190,7 @@ impl<DB: Database, D: BodyDownloader, C: Consensus> Stage<DB> for BodyStage<D, C
// - We reached our target and the target was not limited by the batch size of the stage
let capped = target < previous_stage_progress;
let done = highest_block < target || !capped;
info!(target: "sync::stages::bodies", stage_progress = highest_block, target, done, "Sync iteration finished");
Ok(ExecOutput { stage_progress: highest_block, done })
}

View File

@@ -18,6 +18,7 @@ use reth_executor::{
use reth_primitives::{Address, Header, StorageEntry, TransactionSignedEcRecovered, H256, U256};
use reth_provider::StateProviderImplRefLatest;
use std::fmt::Debug;
use tracing::*;
const EXECUTION: StageId = StageId("Execution");
@@ -108,6 +109,7 @@ impl<DB: Database> Stage<DB> for ExecutionStage {
// no more canonical blocks, we are done with execution.
if canonical_batch.is_empty() {
info!(target: "sync::stages::execution", stage_progress = last_block, "Target block already reached");
return Ok(ExecOutput { stage_progress: last_block, done: true })
}
@@ -131,8 +133,6 @@ impl<DB: Database> Stage<DB> for ExecutionStage {
// Fetch transactions, execute them and generate results
let mut block_change_patches = Vec::with_capacity(canonical_batch.len());
for (header, body) in block_batch.iter() {
let num = header.number;
tracing::trace!(target: "stages::execution", ?num, "Execute block num.");
// iterate over all transactions
let mut tx_walker = tx_cursor.walk(body.start_tx_id)?;
let mut transactions = Vec::with_capacity(body.tx_count as usize);
@@ -141,6 +141,7 @@ impl<DB: Database> Stage<DB> for ExecutionStage {
let (tx_index, tx) =
tx_walker.next().ok_or(DatabaseIntegrityError::EndOfTransactionTable)??;
if tx_index != index {
error!(target: "sync::stages::execution", block = header.number, expected = index, found = tx_index, ?body, "Transaction gap");
return Err(DatabaseIntegrityError::TransactionsGap { missing: tx_index }.into())
}
transactions.push(tx);
@@ -154,6 +155,7 @@ impl<DB: Database> Stage<DB> for ExecutionStage {
.next()
.ok_or(DatabaseIntegrityError::EndOfTransactionSenderTable)??;
if tx_index != index {
error!(target: "sync::stages::execution", block = header.number, expected = index, found = tx_index, ?body, "Signer gap");
return Err(
DatabaseIntegrityError::TransactionsSignerGap { missing: tx_index }.into()
)
@@ -172,6 +174,7 @@ impl<DB: Database> Stage<DB> for ExecutionStage {
// for now use default eth config
let state_provider = SubState::new(State::new(StateProviderImplRefLatest::new(&**tx)));
trace!(target: "sync::stages::execution", number = header.number, txs = recovered_transactions.len(), "Executing block");
let change_set = std::thread::scope(|scope| {
let handle = std::thread::Builder::new()
.stack_size(50 * 1024 * 1024)
@@ -195,6 +198,7 @@ impl<DB: Database> Stage<DB> for ExecutionStage {
// Get last tx count so that we can know amount of transaction in the block.
let mut current_transition_id = tx.get_block_transition_by_num(last_block)? + 1;
info!(target: "sync::stages::execution", current_transition_id, blocks = block_change_patches.len(), "Inserting execution results");
// apply changes to plain database.
for results in block_change_patches.into_iter() {
@@ -205,6 +209,7 @@ impl<DB: Database> Stage<DB> for ExecutionStage {
let AccountChangeSet { account, wipe_storage, storage } = account_change_set;
// apply account change to db. Updates AccountChangeSet and PlainAccountState
// tables.
trace!(target: "sync::stages::execution", ?address, current_transition_id, ?account, wipe_storage, "Applying account changeset");
account.apply_to_db(&**tx, address, current_transition_id)?;
// wipe storage
@@ -218,6 +223,8 @@ impl<DB: Database> Stage<DB> for ExecutionStage {
let mut hkey = H256::zero();
key.to_big_endian(&mut hkey.0);
trace!(target: "sync::stages::execution", ?address, current_transition_id, ?hkey, ?old_value, ?new_value, "Applying storage changeset");
// insert into StorageChangeSet
tx.put::<tables::StorageChangeSet>(
storage_id.clone(),
@@ -242,7 +249,9 @@ impl<DB: Database> Stage<DB> for ExecutionStage {
for (hash, bytecode) in result.new_bytecodes.into_iter() {
// make different types of bytecode. Checked and maybe even analyzed (needs to
// be packed). Currently save only raw bytes.
tx.put::<tables::Bytecodes>(hash, bytecode.bytes()[..bytecode.len()].to_vec())?;
let bytecode = bytecode.bytes();
trace!(target: "sync::stages::execution", ?hash, ?bytecode, len = bytecode.len(), "Inserting bytecode");
tx.put::<tables::Bytecodes>(hash, bytecode[..bytecode.len()].to_vec())?;
// NOTE: bytecode bytes are not inserted in change set and it stand in saparate
// table
@@ -254,15 +263,17 @@ impl<DB: Database> Stage<DB> for ExecutionStage {
if let Some(block_reward_changeset) = results.block_reward {
// we are sure that block reward index is present.
for (address, changeset) in block_reward_changeset.into_iter() {
trace!(target: "sync::stages::execution", ?address, current_transition_id, "Applying block reward");
changeset.apply_to_db(&**tx, address, current_transition_id)?;
}
current_transition_id += 1;
}
}
let last_block = last_block + canonical_batch.len() as u64;
let is_done = canonical_batch.len() < BATCH_SIZE as usize;
Ok(ExecOutput { done: is_done, stage_progress: last_block })
let stage_progress = last_block + canonical_batch.len() as u64;
let done = canonical_batch.len() < BATCH_SIZE as usize;
info!(target: "sync::stages::execution", done, stage_progress, "Sync iteration finished");
Ok(ExecOutput { done, stage_progress })
}
/// Unwind the stage.

View File

@@ -76,12 +76,7 @@ impl<DB: Database, D: HeaderDownloader, C: Consensus, H: HeadersClient, S: Statu
// Lookup the head and tip of the sync range
let (head, tip) = self.get_head_and_tip(tx, stage_progress).await?;
debug!(
target: "sync::stages::headers",
"Syncing from tip {:?} to head {:?}",
tip,
head.hash()
);
debug!(target: "sync::stages::headers", ?tip, head = ?head.hash(), "Commencing sync");
let mut current_progress = stage_progress;
let mut stream =
@@ -93,11 +88,7 @@ impl<DB: Database, D: HeaderDownloader, C: Consensus, H: HeadersClient, S: Statu
while let Some(headers) = stream.next().await {
match headers.into_iter().collect::<Result<Vec<_>, _>>() {
Ok(res) => {
info!(
target: "sync::stages::headers",
len = res.len(),
"Received headers"
);
info!(target: "sync::stages::headers", len = res.len(), "Received headers");
// Perform basic response validation
self.validate_header_response(&res)?;
@@ -107,25 +98,15 @@ impl<DB: Database, D: HeaderDownloader, C: Consensus, H: HeadersClient, S: Statu
}
Err(e) => match e {
DownloadError::Timeout => {
warn!(
target: "sync::stages::headers",
"No response for header request"
);
warn!(target: "sync::stages::headers", "No response for header request");
return Err(StageError::Recoverable(DownloadError::Timeout.into()))
}
DownloadError::HeaderValidation { hash, error } => {
error!(
target: "sync::stages::headers",
"Validation error for header {hash}: {error}"
);
error!(target: "sync::stages::headers", ?error, ?hash, "Validation error");
return Err(StageError::Validation { block: stage_progress, error })
}
error => {
error!(
target: "sync::stages::headers",
?error,
"An unexpected error occurred"
);
error!(target: "sync::stages::headers", ?error, "Unexpected error");
return Err(StageError::Recoverable(error.into()))
}
},
@@ -133,6 +114,7 @@ impl<DB: Database, D: HeaderDownloader, C: Consensus, H: HeadersClient, S: Statu
}
// Write total difficulty values after all headers have been inserted
debug!(target: "sync::stages::headers", head = ?head.hash(), "Writing total difficulty");
self.write_td::<DB>(tx, &head)?;
let stage_progress = current_progress.max(

View File

@@ -13,6 +13,7 @@ use reth_db::{
use reth_primitives::TxNumber;
use std::fmt::Debug;
use thiserror::Error;
use tracing::*;
const SENDERS: StageId = StageId("Senders");
@@ -63,6 +64,7 @@ impl<DB: Database> Stage<DB> for SendersStage {
let max_block_num = previous_stage_progress.min(stage_progress + self.commit_threshold);
if max_block_num <= stage_progress {
info!(target: "sync::stages::senders", target = max_block_num, stage_progress, "Target block already reached");
return Ok(ExecOutput { stage_progress, done: true })
}
@@ -74,6 +76,7 @@ impl<DB: Database> Stage<DB> for SendersStage {
// No transactions to walk over
if start_tx_index > end_tx_index {
info!(target: "sync::stages::senders", start_tx_index, end_tx_index, "Target transaction already reached");
return Ok(ExecOutput { stage_progress: max_block_num, done: true })
}
@@ -88,17 +91,19 @@ impl<DB: Database> Stage<DB> for SendersStage {
.take_while(|res| res.as_ref().map(|(k, _)| *k <= end_tx_index).unwrap_or_default());
// Iterate over transactions in chunks
info!(target: "sync::stages::senders", start_tx_index, end_tx_index, "Recovering senders");
for chunk in &entries.chunks(self.batch_size) {
let transactions = chunk.collect::<Result<Vec<_>, DbError>>()?;
// Recover signers for the chunk in parallel
let recovered = transactions
.into_par_iter()
.map(|(id, transaction)| {
.map(|(tx_id, transaction)| {
trace!(target: "sync::stages::senders", tx_id, hash = ?transaction.hash(), "Recovering sender");
let signer =
transaction.recover_signer().ok_or_else::<StageError, _>(|| {
SendersStageError::SenderRecovery { tx: id }.into()
SendersStageError::SenderRecovery { tx: tx_id }.into()
})?;
Ok((id, signer))
Ok((tx_id, signer))
})
.collect::<Result<Vec<_>, StageError>>()?;
// Append the signers to the table
@@ -106,6 +111,7 @@ impl<DB: Database> Stage<DB> for SendersStage {
}
let done = max_block_num >= previous_stage_progress;
info!(target: "sync::stages::senders", stage_progress = max_block_num, done, "Sync iteration finished");
Ok(ExecOutput { stage_progress: max_block_num, done })
}

View File

@@ -65,9 +65,15 @@ pub type HeaderHash = H256;
/// element as BlockNumber, helps out with querying/sorting.
///
/// Since it's used as a key, the `BlockNumber` is not compressed when encoding it.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[derive(Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
pub struct BlockNumHash(pub (BlockNumber, BlockHash));
impl std::fmt::Debug for BlockNumHash {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("").field(&self.0 .0).field(&self.0 .1).finish()
}
}
impl BlockNumHash {
/// Consumes `Self` and returns [`BlockNumber`], [`BlockHash`]
pub fn take(self) -> (BlockNumber, BlockHash) {