fix: dont exit engine on failed FCU (#2676)

This commit is contained in:
Matthias Seitz
2023-05-15 15:43:40 +02:00
committed by GitHub
parent 11cd0d4753
commit 5d99b126ee
4 changed files with 67 additions and 33 deletions

View File

@@ -39,16 +39,30 @@ impl From<reth_interfaces::db::Error> for BeaconConsensusEngineError {
///
/// This represents all possible error cases, that must be returned as JSON RCP errors back to the
/// beacon node.
#[derive(Debug, Clone, Copy, Eq, PartialEq, thiserror::Error)]
#[derive(Debug, thiserror::Error)]
pub enum BeaconForkChoiceUpdateError {
/// Thrown when a forkchoice update resulted in an error.
#[error("Forkchoice update error: {0}")]
ForkchoiceUpdateError(#[from] ForkchoiceUpdateError),
/// Internal errors, for example, error while reading from the database.
#[error(transparent)]
Internal(Box<reth_interfaces::Error>),
/// Thrown when the engine task is unavailable/stopped.
#[error("beacon consensus engine task stopped")]
EngineUnavailable,
}
impl From<reth_interfaces::Error> for BeaconForkChoiceUpdateError {
fn from(e: reth_interfaces::Error) -> Self {
Self::Internal(Box::new(e))
}
}
impl From<reth_interfaces::db::Error> for BeaconForkChoiceUpdateError {
fn from(e: reth_interfaces::db::Error) -> Self {
Self::Internal(Box::new(e.into()))
}
}
/// Represents all error cases when handling a new payload.
///
/// This represents all possible error cases that must be returned as JSON RCP errors back to the

View File

@@ -19,6 +19,7 @@ use tokio::sync::{mpsc::UnboundedSender, oneshot};
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[derive(Debug)]
pub struct OnForkChoiceUpdated {
/// Tracks if this update was valid.
is_valid_update: bool,
/// Returns the result of the forkchoice update.
fut: Either<futures::future::Ready<ForkChoiceUpdateResult>, PendingPayloadId>,
@@ -129,7 +130,7 @@ pub enum BeaconEngineMessage {
/// The payload attributes for block building.
payload_attrs: Option<PayloadAttributes>,
/// The sender for returning forkchoice updated result.
tx: oneshot::Sender<OnForkChoiceUpdated>,
tx: oneshot::Sender<Result<OnForkChoiceUpdated, reth_interfaces::Error>>,
},
/// Add a new listener for [`BeaconEngineMessage`].
EventListener(UnboundedSender<BeaconConsensusEngineEvent>),

View File

@@ -94,7 +94,7 @@ impl BeaconConsensusEngineHandle {
Ok(self
.send_fork_choice_updated(state, payload_attrs)
.map_err(|_| BeaconForkChoiceUpdateError::EngineUnavailable)
.await?
.await??
.await?)
}
@@ -104,7 +104,7 @@ impl BeaconConsensusEngineHandle {
&self,
state: ForkchoiceState,
payload_attrs: Option<PayloadAttributes>,
) -> oneshot::Receiver<OnForkChoiceUpdated> {
) -> oneshot::Receiver<Result<OnForkChoiceUpdated, reth_interfaces::Error>> {
let (tx, rx) = oneshot::channel();
let _ = self.to_engine.send(BeaconEngineMessage::ForkchoiceUpdated {
state,
@@ -307,16 +307,53 @@ where
Some(status)
}
/// Invoked when we receive a new forkchoice update message.
///
/// Returns `true` if the engine now reached its maximum block number, See
/// [EngineSyncController::has_reached_max_block].
fn on_forkchoice_updated(
&mut self,
state: ForkchoiceState,
attrs: Option<PayloadAttributes>,
tx: oneshot::Sender<Result<OnForkChoiceUpdated, reth_interfaces::Error>>,
) -> bool {
self.metrics.forkchoice_updated_messages.increment(1);
let on_updated = match self.forkchoice_updated(state, attrs) {
Ok(response) => response,
Err(error) => {
let _ = tx.send(Err(error));
return false
}
};
let is_valid_response = on_updated.is_valid_update();
let _ = tx.send(Ok(on_updated));
// Terminate the sync early if it's reached the maximum user
// configured block.
if is_valid_response {
let tip_number = self.blockchain.canonical_tip().number;
if self.sync.has_reached_max_block(tip_number) {
return true
}
}
false
}
/// Called to resolve chain forks and ensure that the Execution layer is working with the latest
/// valid chain.
///
/// These responses should adhere to the [Engine API Spec for
/// `engine_forkchoiceUpdated`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification-1).
fn on_forkchoice_updated(
///
/// Returns an error if an internal error occurred like a database error.
fn forkchoice_updated(
&mut self,
state: ForkchoiceState,
attrs: Option<PayloadAttributes>,
) -> Result<OnForkChoiceUpdated, BeaconConsensusEngineError> {
) -> Result<OnForkChoiceUpdated, reth_interfaces::Error> {
trace!(target: "consensus::engine", ?state, "Received new forkchoice state update");
if state.head_block_hash.is_zero() {
return Ok(OnForkChoiceUpdated::invalid_state())
@@ -373,7 +410,7 @@ where
if let Error::Execution(ref err) = error {
if err.is_fatal() {
tracing::error!(target: "consensus::engine", ?err, "Encountered fatal error");
return Err(BeaconConsensusEngineError::Common(error))
return Err(error)
}
}
@@ -393,10 +430,7 @@ where
/// Sets the state of the canon chain tracker based on the given forkchoice update.
///
/// This should be called before issuing a VALID forkchoice update.
fn update_canon_chain(
&self,
update: &ForkchoiceState,
) -> Result<(), BeaconConsensusEngineError> {
fn update_canon_chain(&self, update: &ForkchoiceState) -> Result<(), reth_interfaces::Error> {
if !update.finalized_block_hash.is_zero() {
let finalized = self
.blockchain
@@ -436,7 +470,7 @@ where
///
/// If the newest head is not invalid, then this will trigger a new pipeline run to sync the gap
///
/// See [Self::on_forkchoice_updated] and [BlockchainTreeEngine::make_canonical].
/// See [Self::forkchoice_updated] and [BlockchainTreeEngine::make_canonical].
fn on_failed_canonical_forkchoice_update(
&mut self,
state: &ForkchoiceState,
@@ -778,24 +812,8 @@ where
while let Poll::Ready(Some(msg)) = this.engine_message_rx.poll_next_unpin(cx) {
match msg {
BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => {
this.metrics.forkchoice_updated_messages.increment(1);
let on_updated = match this.on_forkchoice_updated(state, payload_attrs) {
Ok(response) => response,
Err(error) => {
error!(target: "consensus::engine", ?state, ?error, "Error getting forkchoice updated response");
return Poll::Ready(Err(error))
}
};
let is_valid_response = on_updated.is_valid_update();
let _ = tx.send(on_updated);
// Terminate the sync early if it's reached the maximum user
// configured block.
if is_valid_response {
let tip_number = this.blockchain.canonical_tip().number;
if this.sync.has_reached_max_block(tip_number) {
return Poll::Ready(Ok(()))
}
if this.on_forkchoice_updated(state, payload_attrs, tx) {
return Poll::Ready(Ok(()))
}
}
BeaconEngineMessage::NewPayload { payload, tx } => {
@@ -1163,7 +1181,7 @@ mod tests {
let mut engine_rx = spawn_consensus_engine(consensus_engine);
let res = env.send_forkchoice_updated(ForkchoiceState::default()).await;
assert_eq!(
assert_matches!(
res,
Err(BeaconForkChoiceUpdateError::ForkchoiceUpdateError(
ForkchoiceUpdateError::InvalidState

View File

@@ -88,9 +88,10 @@ impl From<EngineApiError> for jsonrpsee_types::error::ErrorObject<'static> {
EngineApiError::PayloadRequestTooLarge { .. } => REQUEST_TOO_LARGE_CODE,
// Error responses from the consensus engine
EngineApiError::ForkChoiceUpdate(err) => match err {
BeaconForkChoiceUpdateError::ForkchoiceUpdateError(err) => return err.into(),
EngineApiError::ForkChoiceUpdate(ref err) => match err {
BeaconForkChoiceUpdateError::ForkchoiceUpdateError(err) => return (*err).into(),
BeaconForkChoiceUpdateError::EngineUnavailable => INTERNAL_ERROR_CODE,
BeaconForkChoiceUpdateError::Internal(_) => INTERNAL_ERROR_CODE,
},
EngineApiError::NewPayload(ref err) => match err {
BeaconOnNewPayloadError::EngineUnavailable => INTERNAL_ERROR_CODE,