mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-30 01:28:21 -05:00
chore: clean up traces (#481)
* chore: clean up traces * fix: typo in error
This commit is contained in:
@@ -84,7 +84,7 @@ pub enum DatabaseIntegrityError {
|
||||
hash: H256,
|
||||
},
|
||||
/// The cumulative transaction count is missing from the database.
|
||||
#[error("No cumulative tx count for ${number} ({hash})")]
|
||||
#[error("No cumulative tx count for #{number} ({hash})")]
|
||||
CumulativeTxCount {
|
||||
/// The block number key
|
||||
number: BlockNumber,
|
||||
|
||||
@@ -183,9 +183,14 @@ impl<DB: Database> Pipeline<DB> {
|
||||
let mut previous_stage = None;
|
||||
for (_, queued_stage) in self.stages.iter_mut().enumerate() {
|
||||
let stage_id = queued_stage.stage.id();
|
||||
trace!(
|
||||
target: "sync::pipeline",
|
||||
stage = %stage_id,
|
||||
"Executing stage"
|
||||
);
|
||||
let next = queued_stage
|
||||
.execute(state, previous_stage, db)
|
||||
.instrument(info_span!("Running", stage = %stage_id))
|
||||
.instrument(info_span!("execute", stage = %stage_id))
|
||||
.await?;
|
||||
|
||||
match next {
|
||||
@@ -287,7 +292,11 @@ impl<DB: Database> QueuedStage<DB> {
|
||||
) -> Result<ControlFlow, PipelineError> {
|
||||
let stage_id = self.stage.id();
|
||||
if self.require_tip && !state.reached_tip() {
|
||||
warn!(stage = %stage_id, "Tip not reached as required by stage, skipping.");
|
||||
warn!(
|
||||
target: "sync::pipeline",
|
||||
stage = %stage_id,
|
||||
"Tip not reached as required by stage, skipping."
|
||||
);
|
||||
state.events_sender.send(PipelineEvent::Skipped { stage_id }).await?;
|
||||
|
||||
// Stage requires us to reach the tip of the chain first, but we have
|
||||
@@ -304,7 +313,11 @@ impl<DB: Database> QueuedStage<DB> {
|
||||
.zip(state.max_block)
|
||||
.map_or(false, |(prev_progress, target)| prev_progress >= target);
|
||||
if stage_reached_max_block {
|
||||
warn!(stage = %stage_id, "Stage reached maximum block, skipping.");
|
||||
warn!(
|
||||
target: "sync::pipeline",
|
||||
stage = %stage_id,
|
||||
"Stage reached maximum block, skipping."
|
||||
);
|
||||
state.events_sender.send(PipelineEvent::Skipped { stage_id }).await?;
|
||||
|
||||
// We reached the maximum block, so we skip the stage
|
||||
@@ -323,7 +336,13 @@ impl<DB: Database> QueuedStage<DB> {
|
||||
.await
|
||||
{
|
||||
Ok(out @ ExecOutput { stage_progress, done, reached_tip }) => {
|
||||
info!(stage = %stage_id, %stage_progress, %done, "Stage made progress");
|
||||
info!(
|
||||
target: "sync::pipeline",
|
||||
stage = %stage_id,
|
||||
%stage_progress,
|
||||
%done,
|
||||
"Stage made progress"
|
||||
);
|
||||
stage_id.save_progress(db.deref(), stage_progress)?;
|
||||
|
||||
state
|
||||
@@ -345,7 +364,12 @@ impl<DB: Database> QueuedStage<DB> {
|
||||
state.events_sender.send(PipelineEvent::Error { stage_id }).await?;
|
||||
|
||||
return if let StageError::Validation { block, error } = err {
|
||||
warn!(stage = %stage_id, bad_block = %block, "Stage encountered a validation error: {error}");
|
||||
warn!(
|
||||
target: "sync::pipeline",
|
||||
stage = %stage_id,
|
||||
bad_block = %block,
|
||||
"Stage encountered a validation error: {error}"
|
||||
);
|
||||
|
||||
// We unwind because of a validation error. If the unwind itself fails,
|
||||
// we bail entirely, otherwise we restart the execution loop from the
|
||||
@@ -355,12 +379,20 @@ impl<DB: Database> QueuedStage<DB> {
|
||||
bad_block: Some(block),
|
||||
})
|
||||
} else if err.is_fatal() {
|
||||
error!(stage = %stage_id, "Stage encountered a fatal error: {err}.");
|
||||
error!(
|
||||
target: "sync::pipeline",
|
||||
stage = %stage_id,
|
||||
"Stage encountered a fatal error: {err}."
|
||||
);
|
||||
Err(err.into())
|
||||
} else {
|
||||
// On other errors we assume they are recoverable if we discard the
|
||||
// transaction and run the stage again.
|
||||
warn!(stage = %stage_id, "Stage encountered a non-fatal error: {err}. Retrying");
|
||||
warn!(
|
||||
target: "sync::pipeline",
|
||||
stage = %stage_id,
|
||||
"Stage encountered a non-fatal error: {err}. Retrying"
|
||||
);
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
@@ -76,8 +76,8 @@ impl<DB: Database, D: HeaderDownloader, C: Consensus, H: HeadersClient, S: Statu
|
||||
|
||||
// Lookup the head and tip of the sync range
|
||||
let (head, tip) = self.get_head_and_tip(db, stage_progress).await?;
|
||||
trace!(
|
||||
target = "sync::stages::headers",
|
||||
debug!(
|
||||
target: "sync::stages::headers",
|
||||
"Syncing from tip {:?} to head {:?}",
|
||||
tip,
|
||||
head.hash()
|
||||
@@ -92,7 +92,11 @@ impl<DB: Database, D: HeaderDownloader, C: Consensus, H: HeadersClient, S: Statu
|
||||
while let Some(headers) = stream.next().await {
|
||||
match headers.into_iter().collect::<Result<Vec<_>, _>>() {
|
||||
Ok(res) => {
|
||||
info!(len = res.len(), "Received headers");
|
||||
info!(
|
||||
target: "sync::stages::headers",
|
||||
len = res.len(),
|
||||
"Received headers"
|
||||
);
|
||||
|
||||
// Perform basic response validation
|
||||
self.validate_header_response(&res)?;
|
||||
@@ -103,15 +107,25 @@ impl<DB: Database, D: HeaderDownloader, C: Consensus, H: HeadersClient, S: Statu
|
||||
}
|
||||
Err(e) => match e {
|
||||
DownloadError::Timeout => {
|
||||
warn!("No response for header request");
|
||||
warn!(
|
||||
target: "sync::stages::headers",
|
||||
"No response for header request"
|
||||
);
|
||||
return Err(StageError::Recoverable(DownloadError::Timeout.into()))
|
||||
}
|
||||
DownloadError::HeaderValidation { hash, error } => {
|
||||
error!("Validation error for header {hash}: {error}");
|
||||
error!(
|
||||
target: "sync::stages::headers",
|
||||
"Validation error for header {hash}: {error}"
|
||||
);
|
||||
return Err(StageError::Validation { block: stage_progress, error })
|
||||
}
|
||||
error => {
|
||||
error!(?error, "An unexpected error occurred");
|
||||
error!(
|
||||
target: "sync::stages::headers",
|
||||
?error,
|
||||
"An unexpected error occurred"
|
||||
);
|
||||
return Err(StageError::Recoverable(error.into()))
|
||||
}
|
||||
},
|
||||
@@ -160,7 +174,7 @@ impl<D: HeaderDownloader, C: Consensus, H: HeadersClient, S: StatusUpdater>
|
||||
let td: U256 = *db
|
||||
.get::<tables::HeaderTD>(block_key)?
|
||||
.ok_or(DatabaseIntegrityError::TotalDifficulty { number: height })?;
|
||||
// self.client.update_status(height, block_key.hash(), td);
|
||||
// TODO: This should happen in the last stage
|
||||
self.network_handle.update_status(height, block_key.hash(), td);
|
||||
Ok(())
|
||||
}
|
||||
@@ -214,7 +228,6 @@ impl<D: HeaderDownloader, C: Consensus, H: HeadersClient, S: StatusUpdater>
|
||||
loop {
|
||||
let _ = state_rcv.changed().await;
|
||||
let forkchoice = state_rcv.borrow();
|
||||
debug!(?forkchoice, "Received fork choice state");
|
||||
if !forkchoice.head_block_hash.is_zero() && forkchoice.head_block_hash != *head {
|
||||
return forkchoice.clone()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user