mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-08 03:01:12 -04:00
fix(stage): fatal channel closed error (#1379)
This commit is contained in:
@@ -33,6 +33,12 @@ pub enum StageError {
|
||||
#[source]
|
||||
error: executor::Error,
|
||||
},
|
||||
/// Invalid checkpoint passed to the stage
|
||||
#[error("Invalid stage progress: {0}")]
|
||||
StageProgress(u64),
|
||||
/// Download channel closed
|
||||
#[error("Download channel closed")]
|
||||
ChannelClosed,
|
||||
/// The stage encountered a database integrity error.
|
||||
#[error("A database integrity error occurred: {0}")]
|
||||
DatabaseIntegrity(#[from] ProviderError),
|
||||
@@ -43,9 +49,6 @@ pub enum StageError {
|
||||
/// rely on external downloaders
|
||||
#[error("Invalid download response: {0}")]
|
||||
Download(#[from] DownloadError),
|
||||
/// Invalid checkpoint passed to the stage
|
||||
#[error("Invalid stage progress: {0}")]
|
||||
StageProgress(u64),
|
||||
/// The stage encountered a recoverable error.
|
||||
///
|
||||
/// These types of errors are caught by the [Pipeline][crate::Pipeline] and trigger a restart
|
||||
@@ -68,6 +71,7 @@ impl StageError {
|
||||
StageError::Download(_) |
|
||||
StageError::DatabaseIntegrity(_) |
|
||||
StageError::StageProgress(_) |
|
||||
StageError::ChannelClosed |
|
||||
StageError::Fatal(_)
|
||||
)
|
||||
}
|
||||
|
||||
@@ -101,13 +101,10 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
|
||||
let mut highest_block = input.stage_progress.unwrap_or_default();
|
||||
debug!(target: "sync::stages::bodies", stage_progress = highest_block, target = end_block, start_tx_id = current_tx_id, transition_id, "Commencing sync");
|
||||
|
||||
let downloaded_bodies = match self.downloader.try_next().await? {
|
||||
Some(downloaded_bodies) => downloaded_bodies,
|
||||
None => {
|
||||
info!(target: "sync::stages::bodies", stage_progress = highest_block, "Download stream exhausted");
|
||||
return Ok(ExecOutput { stage_progress: highest_block, done: true })
|
||||
}
|
||||
};
|
||||
// Task downloader can return `None` only if the response relaying channel was closed. This
|
||||
// is a fatal error to prevent the pipeline from running forever.
|
||||
let downloaded_bodies =
|
||||
self.downloader.try_next().await?.ok_or(StageError::ChannelClosed)?;
|
||||
|
||||
trace!(target: "sync::stages::bodies", bodies_len = downloaded_bodies.len(), "Writing blocks");
|
||||
for response in downloaded_bodies {
|
||||
|
||||
@@ -194,14 +194,10 @@ where
|
||||
self.downloader.update_sync_gap(gap.local_head, gap.target);
|
||||
|
||||
// The downloader returns the headers in descending order starting from the tip
|
||||
// down to the local head (latest block in db)
|
||||
let downloaded_headers = match self.downloader.next().await {
|
||||
Some(downloaded_headers) => downloaded_headers,
|
||||
None => {
|
||||
info!(target: "sync::stages::headers", stage_progress = current_progress, target = ?tip, "Download stream exhausted");
|
||||
return Ok(ExecOutput { stage_progress: current_progress, done: true })
|
||||
}
|
||||
};
|
||||
// down to the local head (latest block in db).
|
||||
// Task downloader can return `None` only if the response relaying channel was closed. This
|
||||
// is a fatal error to prevent the pipeline from running forever.
|
||||
let downloaded_headers = self.downloader.next().await.ok_or(StageError::ChannelClosed)?;
|
||||
|
||||
info!(target: "sync::stages::headers", len = downloaded_headers.len(), "Received headers");
|
||||
|
||||
|
||||
Reference in New Issue
Block a user