From 4e2712bef273b9a2cd6278b5a09ad00436b456e6 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Wed, 5 Jul 2023 18:34:56 +0200 Subject: [PATCH] refactor: extract exceeds pipeline threshold (#3605) --- crates/consensus/beacon/src/engine/mod.rs | 121 ++++++++++++++-------- 1 file changed, 76 insertions(+), 45 deletions(-) diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index 26fc189e36..80c0071412 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -402,10 +402,76 @@ where /// Returns true if the distance from the local tip to the block is greater than the configured /// threshold + #[inline] fn exceeds_pipeline_run_threshold(&self, local_tip: u64, block: u64) -> bool { block > local_tip && block - local_tip > self.pipeline_run_threshold } + /// Returns the finalized hash to sync to if the distance from the local tip to the block is + /// greater than the configured threshold and we're not synced to the finalized block yet block + /// yet (if we've seen that block already). + /// + /// If this is invoked after a new block has been downloaded, the downloaded block could be the + /// (missing) finalized block. + fn can_pipeline_sync_to_finalized( + &self, + canonical_tip_num: u64, + target_block_number: u64, + downloaded_block: Option, + ) -> Option { + let sync_target_state = self.forkchoice_state_tracker.sync_target_state(); + + // check if the distance exceeds the threshold for pipeline sync + let mut exceeds_pipeline_run_threshold = + self.exceeds_pipeline_run_threshold(canonical_tip_num, target_block_number); + + // check if the downloaded block is the tracked finalized block + if let Some(ref buffered_finalized) = sync_target_state + .as_ref() + .and_then(|state| self.blockchain.buffered_header_by_hash(state.finalized_block_hash)) + { + // if we have buffered the finalized block, we should check how far + // we're off + exceeds_pipeline_run_threshold = + self.exceeds_pipeline_run_threshold(canonical_tip_num, buffered_finalized.number); + } + + // If this is invoked after we downloaded a block we can check if this block is the + // finalized block + if let (Some(downloaded_block), Some(ref state)) = (downloaded_block, sync_target_state) { + if downloaded_block.hash == state.finalized_block_hash { + // we downloaded the finalized block + exceeds_pipeline_run_threshold = + self.exceeds_pipeline_run_threshold(canonical_tip_num, downloaded_block.number); + } + } + + // if the number of missing blocks is greater than the max, run the + // pipeline + if exceeds_pipeline_run_threshold { + if let Some(state) = sync_target_state { + // if we have already canonicalized the finalized block, we should + // skip the pipeline run + match self.blockchain.header_by_hash_or_number(state.finalized_block_hash.into()) { + Err(err) => { + warn!(target: "consensus::engine", ?err, "Failed to get finalized block header"); + } + Ok(None) => { + // we don't have the block yet and the distance exceeds the allowed + // threshold + return Some(state.finalized_block_hash) + } + Ok(Some(_)) => { + // we're fully synced to the finalized block + // but we want to continue downloading the missing parent + } + } + } + } + + None + } + /// If validation fails, the response MUST contain the latest valid hash: /// /// - The block hash of the ancestor of the invalid payload satisfying the following two @@ -1101,52 +1167,17 @@ where ) { // compare the missing parent with the canonical tip let canonical_tip_num = self.blockchain.canonical_tip().number; - let sync_target_state = self.forkchoice_state_tracker.sync_target_state(); - trace!(target: "consensus::engine", ?downloaded_block, ?missing_parent, tip=?canonical_tip_num, "Handling disconnected block"); - - let mut exceeds_pipeline_run_threshold = - self.exceeds_pipeline_run_threshold(canonical_tip_num, missing_parent.number); - - // check if the downloaded block is the tracked safe block - if let Some(ref state) = sync_target_state { - if downloaded_block.hash == state.finalized_block_hash { - // we downloaded the finalized block - exceeds_pipeline_run_threshold = - self.exceeds_pipeline_run_threshold(canonical_tip_num, downloaded_block.number); - } else if let Some(buffered_finalized) = - self.blockchain.buffered_header_by_hash(state.finalized_block_hash) - { - // if we have buffered the finalized block, we should check how far - // we're off - exceeds_pipeline_run_threshold = self - .exceeds_pipeline_run_threshold(canonical_tip_num, buffered_finalized.number); - } - } - - // if the number of missing blocks is greater than the max, run the - // pipeline - if exceeds_pipeline_run_threshold { - if let Some(state) = sync_target_state { - // if we have already canonicalized the finalized block, we should - // skip the pipeline run - match self.blockchain.header_by_hash_or_number(state.finalized_block_hash.into()) { - Err(err) => { - warn!(target: "consensus::engine", ?err, "Failed to get finalized block header"); - } - Ok(None) => { - // we don't have the block yet and the distance exceeds the allowed - // threshold - self.sync.set_pipeline_sync_target(state.safe_block_hash); - // we can exit early here because the pipeline will take care of this - return - } - Ok(Some(_)) => { - // we're fully synced to the finalized block - // but we want to continue downloading the missing parent - } - } - } + if let Some(target) = self.can_pipeline_sync_to_finalized( + canonical_tip_num, + missing_parent.number, + Some(downloaded_block), + ) { + // we don't have the block yet and the distance exceeds the allowed + // threshold + self.sync.set_pipeline_sync_target(target); + // we can exit early here because the pipeline will take care of syncing + return } // continue downloading the missing parent