fix(engine): run incomplete pipeline (#1999)

This commit is contained in:
Roman Krasiuk
2023-03-28 13:00:12 +03:00
committed by GitHub
parent 7263c9a644
commit 1ecbd55d45

View File

@@ -12,7 +12,7 @@ use reth_provider::ExecutorFactory;
use reth_rpc_types::engine::{
ExecutionPayload, ForkchoiceUpdated, PayloadAttributes, PayloadStatus, PayloadStatusEnum,
};
use reth_stages::Pipeline;
use reth_stages::{stages::FINISH, Pipeline};
use reth_tasks::TaskSpawner;
use std::{
pin::Pin,
@@ -131,19 +131,34 @@ where
&mut self,
state: ForkchoiceState,
_attrs: Option<PayloadAttributes>,
) -> ForkchoiceUpdated {
) -> Result<ForkchoiceUpdated, reth_interfaces::Error> {
trace!(target: "consensus::engine", ?state, "Received new forkchoice state");
if state.head_block_hash.is_zero() {
return ForkchoiceUpdated::new(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
validation_error: BeaconEngineError::ForkchoiceEmptyHead.to_string(),
}))
return Ok(ForkchoiceUpdated::new(PayloadStatus::from_status(
PayloadStatusEnum::Invalid {
validation_error: BeaconEngineError::ForkchoiceEmptyHead.to_string(),
},
)))
}
let is_first_forkchoice = self.forkchoice_state.is_none();
self.forkchoice_state = Some(state);
let status = if self.is_pipeline_idle() {
match self.blockchain_tree.make_canonical(&state.head_block_hash) {
Ok(_) => PayloadStatus::from_status(PayloadStatusEnum::Valid),
Ok(_) => {
let head_block_number = self
.db
.view(|tx| tx.get::<tables::HeaderNumbers>(state.head_block_hash))??
.expect("was canonicalized, so it exists");
let pipeline_min_progress =
FINISH.get_progress(&self.db.tx()?)?.unwrap_or_default();
if pipeline_min_progress < head_block_number {
self.require_pipeline_run(PipelineTarget::Head);
}
PayloadStatus::from_status(PayloadStatusEnum::Valid)
}
Err(error) => {
warn!(target: "consensus::engine", ?state, ?error, "Error canonicalizing the head hash");
// If this is the first forkchoice received, start downloading from safe block
@@ -169,7 +184,7 @@ where
PayloadStatus::from_status(PayloadStatusEnum::Syncing)
};
trace!(target: "consensus::engine", ?state, ?status, "Returning forkchoice status");
ForkchoiceUpdated::new(status)
Ok(ForkchoiceUpdated::new(status))
}
/// When the Consensus layer receives a new block via the consensus gossip protocol,
@@ -309,7 +324,13 @@ where
while let Poll::Ready(Some(msg)) = this.message_rx.poll_next_unpin(cx) {
match msg {
BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => {
let response = this.on_forkchoice_updated(state, payload_attrs);
let response = match this.on_forkchoice_updated(state, payload_attrs) {
Ok(response) => response,
Err(error) => {
error!(target: "consensus::engine", ?state, ?error, "Error getting forkchoice updated response");
return Poll::Ready(Err(error.into()))
}
};
let is_valid_response =
matches!(response.payload_status.status, PayloadStatusEnum::Valid);
let _ = tx.send(Ok(response));
@@ -722,6 +743,7 @@ mod tests {
let genesis = random_block(0, None, None, Some(0));
let block1 = random_block(1, Some(genesis.hash), None, Some(0));
insert_blocks(env.db.as_ref(), [&genesis, &block1].into_iter());
env.db.update(|tx| FINISH.save_progress(tx, block1.number)).unwrap().unwrap();
let mut engine_rx = spawn_consensus_engine(consensus_engine);