From 0d2f21412f186c9b85d37f8be12ac8cd1add3d92 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Fri, 12 May 2023 18:15:51 +0200 Subject: [PATCH] chore: split on new payload function (#2645) --- crates/consensus/beacon/src/engine/mod.rs | 143 ++++++++++++++-------- 1 file changed, 94 insertions(+), 49 deletions(-) diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index b5a3d2ca92..9f4d0b5493 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -541,16 +541,41 @@ where /// /// These responses should adhere to the [Engine API Spec for /// `engine_newPayload`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification). + #[instrument(level = "trace", skip(self, payload), fields(block_hash= ?payload.block_hash, block_number = %payload.block_number.as_u64()), target = "consensus::engine")] fn on_new_payload(&mut self, payload: ExecutionPayload) -> PayloadStatus { - let block_number = payload.block_number.as_u64(); - let block_hash = payload.block_hash; - let parent_hash = payload.parent_hash; + trace!(target: "consensus::engine", "Received new payload"); - trace!(target: "consensus::engine", ?block_hash, block_number, "Received new payload"); + let block = match self.ensure_well_formed_payload(payload) { + Ok(block) => block, + Err(status) => return status, + }; + + let status = if self.sync.is_pipeline_idle() { + // we can only insert new payloads if the pipeline is _not_ running, because it holds + // exclusive access to the database + self.try_insert_new_payload(block) + } else { + self.try_buffer_payload(block) + }; + + trace!(target: "consensus::engine", ?status, "Returning payload status"); + status + } + + /// Ensures that the given payload does not violate any consensus rules that concern the block's + /// layout, like: + /// - missing or invalid base fee + /// - invalid extra data + /// - invalid transactions + fn ensure_well_formed_payload( + &self, + payload: ExecutionPayload, + ) -> Result { + let parent_hash = payload.parent_hash; let block = match SealedBlock::try_from(payload) { Ok(block) => block, Err(error) => { - error!(target: "consensus::engine", ?block_hash, block_number, ?error, "Invalid payload"); + error!(target: "consensus::engine", ?error, "Invalid payload"); let mut latest_valid_hash = None; if !error.is_block_hash_mismatch() { @@ -561,49 +586,25 @@ where } let status = PayloadStatusEnum::from(error); - return PayloadStatus::new(status, latest_valid_hash) + return Err(PayloadStatus::new(status, latest_valid_hash)) } }; - let header = block.header.clone(); + Ok(block) + } - let status = if self.sync.is_pipeline_idle() { - // we can only insert new payloads if the pipeline is _not_ running, because it holds - // exclusive access to the database - match self.blockchain.insert_block_without_senders(block) { - Ok(status) => { - let mut latest_valid_hash = None; - let status = match status { - BlockStatus::Valid => { - latest_valid_hash = Some(block_hash); - self.listeners.notify(BeaconConsensusEngineEvent::CanonicalBlockAdded( - block_number, - block_hash, - )); - PayloadStatusEnum::Valid - } - BlockStatus::Accepted => { - self.listeners.notify(BeaconConsensusEngineEvent::ForkBlockAdded( - block_number, - block_hash, - )); - PayloadStatusEnum::Accepted - } - BlockStatus::Disconnected => PayloadStatusEnum::Syncing, - }; - PayloadStatus::new(status, latest_valid_hash) - } - Err(error) => { - // payload is deemed invalid, insert it into the cache - self.invalid_headers.insert(header); - - let latest_valid_hash = - self.latest_valid_hash_for_invalid_payload(parent_hash, Some(&error)); - let status = PayloadStatusEnum::Invalid { validation_error: error.to_string() }; - PayloadStatus::new(status, latest_valid_hash) - } - } - } else if let Err(error) = self.blockchain.buffer_block_without_sender(block) { + /// When the pipeline is actively syncing the tree is unable to commit any additional blocks + /// since the pipeline holds exclusive access to the database. + /// + /// In this scenario we buffer the payload in the tree if the payload is valid, once the + /// pipeline finished syncing the tree is then able to also use the buffered payloads to commit + /// to a (newer) canonical chain. + /// + /// This will return `SYNCING` on success, since the pipeline is running. and `INVALID` if we + /// were unable to buffer the payload because it is considered invalid. + fn try_buffer_payload(&mut self, block: SealedBlock) -> PayloadStatus { + let parent_hash = block.parent_hash; + if let Err(error) = self.blockchain.buffer_block_without_sender(block) { // received a new payload while we're still syncing to the target let latest_valid_hash = self.latest_valid_hash_for_invalid_payload(parent_hash, Some(&error)); @@ -612,9 +613,53 @@ where } else { // successfully buffered the block PayloadStatus::from_status(PayloadStatusEnum::Syncing) - }; - trace!(target: "consensus::engine", ?block_hash, block_number, ?status, "Returning payload status"); - status + } + } + + /// Attempts to insert a new payload into the tree. + /// + /// Caution: This expects that the pipeline is idle. + fn try_insert_new_payload(&mut self, block: SealedBlock) -> PayloadStatus { + debug_assert!(self.sync.is_pipeline_idle(), "pipeline must be idle"); + + let parent_hash = block.parent_hash; + let block_hash = block.hash; + let block_number = block.number; + let header = block.header.clone(); + + match self.blockchain.insert_block_without_senders(block) { + Ok(status) => { + let mut latest_valid_hash = None; + let status = match status { + BlockStatus::Valid => { + latest_valid_hash = Some(block_hash); + self.listeners.notify(BeaconConsensusEngineEvent::CanonicalBlockAdded( + block_number, + block_hash, + )); + PayloadStatusEnum::Valid + } + BlockStatus::Accepted => { + self.listeners.notify(BeaconConsensusEngineEvent::ForkBlockAdded( + block_number, + block_hash, + )); + PayloadStatusEnum::Accepted + } + BlockStatus::Disconnected => PayloadStatusEnum::Syncing, + }; + PayloadStatus::new(status, latest_valid_hash) + } + Err(error) => { + // payload is deemed invalid, insert it into the cache + self.invalid_headers.insert(header); + + let latest_valid_hash = + self.latest_valid_hash_for_invalid_payload(parent_hash, Some(&error)); + let status = PayloadStatusEnum::Invalid { validation_error: error.to_string() }; + PayloadStatus::new(status, latest_valid_hash) + } + } } /// Attempt to restore the tree with the finalized block number. @@ -664,8 +709,8 @@ where // [head..FCU.number] let hash = block.hash; - if !self.on_new_payload(block.into()).is_valid() { - // if the payload is invalid we run the pipeline + if !self.try_insert_new_payload(block).is_valid() { + // if the payload is invalid, we run the pipeline self.sync.set_pipeline_sync_target(hash); } }