refactor: move node-core/engine to standalone crate (#9120)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
Thomas Coratger
2024-06-26 17:34:35 +02:00
committed by GitHub
parent 8775a93d33
commit 1fde1dca1e
13 changed files with 70 additions and 12 deletions

View File

@@ -1,154 +0,0 @@
//! Stores engine API messages to disk for later inspection and replay.
use futures::{Stream, StreamExt};
use reth_beacon_consensus::BeaconEngineMessage;
use reth_engine_primitives::EngineTypes;
use reth_fs_util as fs;
use reth_rpc_types::{
engine::{CancunPayloadFields, ForkchoiceState},
ExecutionPayload,
};
use serde::{Deserialize, Serialize};
use std::{
collections::BTreeMap,
path::PathBuf,
pin::Pin,
task::{ready, Context, Poll},
time::SystemTime,
};
use tracing::*;
/// A message from the engine API that has been stored to disk.
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum StoredEngineApiMessage<Attributes> {
/// The on-disk representation of an `engine_forkchoiceUpdated` method call.
ForkchoiceUpdated {
/// The [`ForkchoiceState`] sent in the persisted call.
state: ForkchoiceState,
/// The payload attributes sent in the persisted call, if any.
payload_attrs: Option<Attributes>,
},
/// The on-disk representation of an `engine_newPayload` method call.
NewPayload {
/// The [`ExecutionPayload`] sent in the persisted call.
payload: ExecutionPayload,
/// The Cancun-specific fields sent in the persisted call, if any.
cancun_fields: Option<CancunPayloadFields>,
},
}
/// This can read and write engine API messages in a specific directory.
#[derive(Debug)]
pub struct EngineMessageStore {
/// The path to the directory that stores the engine API messages.
path: PathBuf,
}
impl EngineMessageStore {
/// Creates a new [`EngineMessageStore`] at the given path.
///
/// The path is expected to be a directory, where individual message JSON files will be stored.
pub const fn new(path: PathBuf) -> Self {
Self { path }
}
/// Stores the received [`BeaconEngineMessage`] to disk, appending the `received_at` time to the
/// path.
pub fn on_message<Engine>(
&self,
msg: &BeaconEngineMessage<Engine>,
received_at: SystemTime,
) -> eyre::Result<()>
where
Engine: EngineTypes,
{
fs::create_dir_all(&self.path)?; // ensure that store path had been created
let timestamp = received_at.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis();
match msg {
BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx: _tx } => {
let filename = format!("{}-fcu-{}.json", timestamp, state.head_block_hash);
fs::write(
self.path.join(filename),
serde_json::to_vec(&StoredEngineApiMessage::ForkchoiceUpdated {
state: *state,
payload_attrs: payload_attrs.clone(),
})?,
)?;
}
BeaconEngineMessage::NewPayload { payload, cancun_fields, tx: _tx } => {
let filename = format!("{}-new_payload-{}.json", timestamp, payload.block_hash());
fs::write(
self.path.join(filename),
serde_json::to_vec(
&StoredEngineApiMessage::<Engine::PayloadAttributes>::NewPayload {
payload: payload.clone(),
cancun_fields: cancun_fields.clone(),
},
)?,
)?;
}
// noop
BeaconEngineMessage::TransitionConfigurationExchanged => (),
};
Ok(())
}
/// Finds and iterates through any stored engine API message files, ordered by timestamp.
pub fn engine_messages_iter(&self) -> eyre::Result<impl Iterator<Item = PathBuf>> {
let mut filenames_by_ts = BTreeMap::<u64, Vec<PathBuf>>::default();
for entry in fs::read_dir(&self.path)? {
let entry = entry?;
let filename = entry.file_name();
if let Some(filename) = filename.to_str().filter(|n| n.ends_with(".json")) {
if let Some(Ok(timestamp)) = filename.split('-').next().map(|n| n.parse::<u64>()) {
filenames_by_ts.entry(timestamp).or_default().push(entry.path());
tracing::debug!(target: "engine::store", timestamp, filename, "Queued engine API message");
} else {
tracing::warn!(target: "engine::store", %filename, "Could not parse timestamp from filename")
}
} else {
tracing::warn!(target: "engine::store", ?filename, "Skipping non json file");
}
}
Ok(filenames_by_ts.into_iter().flat_map(|(_, paths)| paths))
}
}
/// A wrapper stream that stores Engine API messages in
/// the specified directory.
#[derive(Debug)]
#[pin_project::pin_project]
pub struct EngineStoreStream<S> {
/// Inner message stream.
#[pin]
stream: S,
/// Engine message store.
store: EngineMessageStore,
}
impl<S> EngineStoreStream<S> {
/// Create new engine store stream wrapper.
pub const fn new(stream: S, path: PathBuf) -> Self {
Self { stream, store: EngineMessageStore::new(path) }
}
}
impl<Engine, S> Stream for EngineStoreStream<S>
where
Engine: EngineTypes,
S: Stream<Item = BeaconEngineMessage<Engine>>,
{
type Item = S::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
let next = ready!(this.stream.poll_next_unpin(cx));
if let Some(msg) = &next {
if let Err(error) = this.store.on_message(msg, SystemTime::now()) {
error!(target: "engine::intercept", ?msg, %error, "Error handling Engine API message");
}
}
Poll::Ready(next)
}
}

View File

@@ -1,99 +0,0 @@
//! Collection of various stream utilities for consensus engine.
use futures::Stream;
use reth_beacon_consensus::BeaconEngineMessage;
use reth_engine_primitives::EngineTypes;
use std::path::PathBuf;
use tokio_util::either::Either;
pub mod engine_store;
use engine_store::EngineStoreStream;
pub mod skip_fcu;
use skip_fcu::EngineSkipFcu;
pub mod skip_new_payload;
use skip_new_payload::EngineSkipNewPayload;
/// The collection of stream extensions for engine API message stream.
pub trait EngineMessageStreamExt<Engine: EngineTypes>:
Stream<Item = BeaconEngineMessage<Engine>>
{
/// Skips the specified number of [`BeaconEngineMessage::ForkchoiceUpdated`] messages from the
/// engine message stream.
fn skip_fcu(self, count: usize) -> EngineSkipFcu<Self>
where
Self: Sized,
{
EngineSkipFcu::new(self, count)
}
/// If the count is [Some], returns the stream that skips the specified number of
/// [`BeaconEngineMessage::ForkchoiceUpdated`] messages. Otherwise, returns `Self`.
fn maybe_skip_fcu(self, maybe_count: Option<usize>) -> Either<EngineSkipFcu<Self>, Self>
where
Self: Sized,
{
if let Some(count) = maybe_count {
Either::Left(self.skip_fcu(count))
} else {
Either::Right(self)
}
}
/// Skips the specified number of [`BeaconEngineMessage::NewPayload`] messages from the
/// engine message stream.
fn skip_new_payload(self, count: usize) -> EngineSkipNewPayload<Self>
where
Self: Sized,
{
EngineSkipNewPayload::new(self, count)
}
/// If the count is [Some], returns the stream that skips the specified number of
/// [`BeaconEngineMessage::NewPayload`] messages. Otherwise, returns `Self`.
fn maybe_skip_new_payload(
self,
maybe_count: Option<usize>,
) -> Either<EngineSkipNewPayload<Self>, Self>
where
Self: Sized,
{
if let Some(count) = maybe_count {
Either::Left(self.skip_new_payload(count))
} else {
Either::Right(self)
}
}
/// Stores engine messages at the specified location.
fn store_messages(self, path: PathBuf) -> EngineStoreStream<Self>
where
Self: Sized,
{
EngineStoreStream::new(self, path)
}
/// If the path is [Some], returns the stream that stores engine messages at the specified
/// location. Otherwise, returns `Self`.
fn maybe_store_messages(
self,
maybe_path: Option<PathBuf>,
) -> Either<EngineStoreStream<Self>, Self>
where
Self: Sized,
{
if let Some(path) = maybe_path {
Either::Left(self.store_messages(path))
} else {
Either::Right(self)
}
}
}
impl<Engine, T> EngineMessageStreamExt<Engine> for T
where
Engine: EngineTypes,
T: Stream<Item = BeaconEngineMessage<Engine>>,
{
}

View File

@@ -1,64 +0,0 @@
//! Stream wrapper that skips specified number of FCUs.
use futures::{Stream, StreamExt};
use reth_beacon_consensus::{BeaconEngineMessage, OnForkChoiceUpdated};
use reth_engine_primitives::EngineTypes;
use std::{
pin::Pin,
task::{ready, Context, Poll},
};
/// Engine API stream wrapper that skips the specified number of forkchoice updated messages.
#[derive(Debug)]
#[pin_project::pin_project]
pub struct EngineSkipFcu<S> {
#[pin]
stream: S,
/// The number of FCUs to skip.
threshold: usize,
/// Current count of skipped FCUs.
skipped: usize,
}
impl<S> EngineSkipFcu<S> {
/// Creates new [`EngineSkipFcu`] stream wrapper.
pub const fn new(stream: S, threshold: usize) -> Self {
Self {
stream,
threshold,
// Start with `threshold` so that the first FCU goes through.
skipped: threshold,
}
}
}
impl<Engine, S> Stream for EngineSkipFcu<S>
where
Engine: EngineTypes,
S: Stream<Item = BeaconEngineMessage<Engine>>,
{
type Item = S::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
let next = ready!(this.stream.poll_next_unpin(cx));
let item = match next {
Some(BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx }) => {
if this.skipped < this.threshold {
*this.skipped += 1;
tracing::warn!(target: "engine::intercept", ?state, ?payload_attrs, threshold=this.threshold, skipped=this.skipped, "Skipping FCU");
let _ = tx.send(Ok(OnForkChoiceUpdated::syncing()));
continue
} else {
*this.skipped = 0;
Some(BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx })
}
}
next => next,
};
return Poll::Ready(item)
}
}
}

View File

@@ -1,67 +0,0 @@
//! Stream wrapper that skips specified number of new payload messages.
use futures::{Stream, StreamExt};
use reth_beacon_consensus::BeaconEngineMessage;
use reth_engine_primitives::EngineTypes;
use reth_rpc_types::engine::{PayloadStatus, PayloadStatusEnum};
use std::{
pin::Pin,
task::{ready, Context, Poll},
};
/// Engine API stream wrapper that skips the specified number of new payload messages.
#[derive(Debug)]
#[pin_project::pin_project]
pub struct EngineSkipNewPayload<S> {
#[pin]
stream: S,
/// The number of messages to skip.
threshold: usize,
/// Current count of skipped messages.
skipped: usize,
}
impl<S> EngineSkipNewPayload<S> {
/// Creates new [`EngineSkipNewPayload`] stream wrapper.
pub const fn new(stream: S, threshold: usize) -> Self {
Self { stream, threshold, skipped: 0 }
}
}
impl<Engine, S> Stream for EngineSkipNewPayload<S>
where
Engine: EngineTypes,
S: Stream<Item = BeaconEngineMessage<Engine>>,
{
type Item = S::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
let next = ready!(this.stream.poll_next_unpin(cx));
let item = match next {
Some(BeaconEngineMessage::NewPayload { payload, cancun_fields, tx }) => {
if this.skipped < this.threshold {
*this.skipped += 1;
tracing::warn!(
target: "engine::intercept",
block_number = payload.block_number(),
block_hash = %payload.block_hash(),
?cancun_fields,
threshold=this.threshold,
skipped=this.skipped, "Skipping new payload"
);
let _ = tx.send(Ok(PayloadStatus::from_status(PayloadStatusEnum::Syncing)));
continue
} else {
*this.skipped = 0;
Some(BeaconEngineMessage::NewPayload { payload, cancun_fields, tx })
}
}
next => next,
};
return Poll::Ready(item)
}
}
}

View File

@@ -11,7 +11,6 @@
pub mod args;
pub mod cli;
pub mod dirs;
pub mod engine;
pub mod exit;
pub mod metrics;
pub mod node_config;