From 6ded64355e7a45593a9f58cf5c3b1583ed8b73c0 Mon Sep 17 00:00:00 2001 From: Roman Krasiuk Date: Sat, 18 Nov 2023 00:14:14 -0800 Subject: [PATCH] pipeline: error on missing buffer in online stages (#5480) --- crates/stages/src/error.rs | 5 +++++ crates/stages/src/stages/bodies.rs | 16 +++++++++------- crates/stages/src/stages/headers.rs | 18 ++++++++++-------- 3 files changed, 24 insertions(+), 15 deletions(-) diff --git a/crates/stages/src/error.rs b/crates/stages/src/error.rs index 8795868d08..4a4df0d269 100644 --- a/crates/stages/src/error.rs +++ b/crates/stages/src/error.rs @@ -62,6 +62,10 @@ pub enum StageError { /// Invalid checkpoint passed to the stage #[error("invalid stage checkpoint: {0}")] StageCheckpoint(u64), + /// Missing download buffer on stage execution. + /// Returned if stage execution was called without polling for readiness. + #[error("missing download buffer")] + MissingDownloadBuffer, /// Download channel closed #[error("download channel closed")] ChannelClosed, @@ -97,6 +101,7 @@ impl StageError { StageError::Download(_) | StageError::DatabaseIntegrity(_) | StageError::StageCheckpoint(_) | + StageError::MissingDownloadBuffer | StageError::MissingSyncGap | StageError::ChannelClosed | StageError::Fatal(_) diff --git a/crates/stages/src/stages/bodies.rs b/crates/stages/src/stages/bodies.rs index cb908ebf95..fe9f583b9a 100644 --- a/crates/stages/src/stages/bodies.rs +++ b/crates/stages/src/stages/bodies.rs @@ -50,13 +50,13 @@ pub struct BodyStage { /// The body downloader. downloader: D, /// Block response buffer. - buffer: Vec, + buffer: Option>, } impl BodyStage { /// Create new bodies stage from downloader. pub fn new(downloader: D) -> Self { - Self { downloader, buffer: Vec::new() } + Self { downloader, buffer: None } } } @@ -71,7 +71,7 @@ impl Stage for BodyStage { cx: &mut Context<'_>, input: ExecInput, ) -> Poll> { - if input.target_reached() || !self.buffer.is_empty() { + if input.target_reached() || self.buffer.is_some() { return Poll::Ready(Ok(())) } @@ -85,7 +85,7 @@ impl Stage for BodyStage { // is a fatal error to prevent the pipeline from running forever. let response = match maybe_next_result { Some(Ok(downloaded)) => { - self.buffer.extend(downloaded); + self.buffer = Some(downloaded); Ok(()) } Some(Err(err)) => Err(err.into()), @@ -118,9 +118,11 @@ impl Stage for BodyStage { let mut next_tx_num = tx_cursor.last()?.map(|(id, _)| id + 1).unwrap_or_default(); debug!(target: "sync::stages::bodies", stage_progress = from_block, target = to_block, start_tx_id = next_tx_num, "Commencing sync"); - trace!(target: "sync::stages::bodies", bodies_len = self.buffer.len(), "Writing blocks"); + + let buffer = self.buffer.take().ok_or(StageError::MissingDownloadBuffer)?; + trace!(target: "sync::stages::bodies", bodies_len = buffer.len(), "Writing blocks"); let mut highest_block = from_block; - for response in self.buffer.drain(..) { + for response in buffer { // Write block let block_number = response.block_number(); @@ -186,7 +188,7 @@ impl Stage for BodyStage { provider: &DatabaseProviderRW<'_, &DB>, input: UnwindInput, ) -> Result { - self.buffer.clear(); + self.buffer.take(); let tx = provider.tx_ref(); // Cursors to unwind bodies, ommers diff --git a/crates/stages/src/stages/headers.rs b/crates/stages/src/stages/headers.rs index 9ad06a198f..40ffa8d946 100644 --- a/crates/stages/src/stages/headers.rs +++ b/crates/stages/src/stages/headers.rs @@ -44,7 +44,7 @@ pub struct HeaderStage { /// Current sync gap. sync_gap: Option, /// Header buffer. - buffer: Vec, + buffer: Option>, } // === impl HeaderStage === @@ -55,7 +55,7 @@ where { /// Create a new header stage pub fn new(database: Provider, downloader: Downloader, mode: HeaderSyncMode) -> Self { - Self { provider: database, downloader, mode, sync_gap: None, buffer: Vec::new() } + Self { provider: database, downloader, mode, sync_gap: None, buffer: None } } fn is_stage_done( @@ -126,7 +126,8 @@ where let current_checkpoint = input.checkpoint(); // Return if buffer already has some items. - if !self.buffer.is_empty() { + if self.buffer.is_some() { + // TODO: review trace!( target: "sync::stages::headers", checkpoint = %current_checkpoint.block_number, @@ -159,7 +160,7 @@ where let result = match ready!(self.downloader.poll_next_unpin(cx)) { Some(Ok(headers)) => { info!(target: "sync::stages::headers", len = headers.len(), "Received headers"); - self.buffer.extend(headers); + self.buffer = Some(headers); Ok(()) } Some(Err(HeadersDownloaderError::DetachedHead { local_head, header, error })) => { @@ -179,15 +180,16 @@ where input: ExecInput, ) -> Result { let current_checkpoint = input.checkpoint(); - if self.buffer.is_empty() { + + let gap = self.sync_gap.clone().ok_or(StageError::MissingSyncGap)?; + if gap.is_closed() { return Ok(ExecOutput::done(current_checkpoint)) } - let gap = self.sync_gap.clone().ok_or(StageError::MissingSyncGap)?; let local_head = gap.local_head.number; let tip = gap.target.tip(); - let downloaded_headers = std::mem::take(&mut self.buffer); + let downloaded_headers = self.buffer.take().ok_or(StageError::MissingDownloadBuffer)?; let tip_block_number = match tip { // If tip is hash and it equals to the first downloaded header's hash, we can use // the block number of this header as tip. @@ -280,7 +282,7 @@ where provider: &DatabaseProviderRW<'_, &DB>, input: UnwindInput, ) -> Result { - self.buffer.clear(); + self.buffer.take(); self.sync_gap.take(); provider.unwind_table_by_walker::(