mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-30 09:38:24 -05:00
chore(engine): BeaconConsensusEngine::current_engine_hook_context (#7676)
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
use crate::hooks::{
|
||||
EngineContext, EngineHook, EngineHookDBAccessLevel, EngineHookError, EngineHookEvent,
|
||||
EngineHook, EngineHookContext, EngineHookDBAccessLevel, EngineHookError, EngineHookEvent,
|
||||
EngineHooks,
|
||||
};
|
||||
use std::{
|
||||
@@ -49,7 +49,7 @@ impl EngineHooksController {
|
||||
pub(crate) fn poll_active_db_write_hook(
|
||||
&mut self,
|
||||
cx: &mut Context<'_>,
|
||||
args: EngineContext,
|
||||
args: EngineHookContext,
|
||||
) -> Poll<Result<PolledHook, EngineHookError>> {
|
||||
let Some(mut hook) = self.active_db_write_hook.take() else { return Poll::Pending };
|
||||
|
||||
@@ -98,7 +98,7 @@ impl EngineHooksController {
|
||||
pub(crate) fn poll_next_hook(
|
||||
&mut self,
|
||||
cx: &mut Context<'_>,
|
||||
args: EngineContext,
|
||||
args: EngineHookContext,
|
||||
db_write_active: bool,
|
||||
) -> Poll<Result<PolledHook, EngineHookError>> {
|
||||
let Some(mut hook) = self.hooks.pop_front() else { return Poll::Pending };
|
||||
@@ -127,7 +127,7 @@ impl EngineHooksController {
|
||||
&mut self,
|
||||
cx: &mut Context<'_>,
|
||||
hook: &mut Box<dyn EngineHook>,
|
||||
args: EngineContext,
|
||||
args: EngineHookContext,
|
||||
db_write_active: bool,
|
||||
) -> Poll<Result<PolledHook, EngineHookError>> {
|
||||
// Hook with DB write access level is not allowed to run due to already running hook with DB
|
||||
@@ -166,7 +166,7 @@ impl EngineHooksController {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::hooks::{
|
||||
EngineContext, EngineHook, EngineHookDBAccessLevel, EngineHookEvent, EngineHooks,
|
||||
EngineHook, EngineHookContext, EngineHookDBAccessLevel, EngineHookEvent, EngineHooks,
|
||||
EngineHooksController,
|
||||
};
|
||||
use futures::poll;
|
||||
@@ -212,7 +212,7 @@ mod tests {
|
||||
fn poll(
|
||||
&mut self,
|
||||
_cx: &mut Context<'_>,
|
||||
_ctx: EngineContext,
|
||||
_ctx: EngineHookContext,
|
||||
) -> Poll<RethResult<EngineHookEvent>> {
|
||||
self.results.pop_front().map_or(Poll::Pending, Poll::Ready)
|
||||
}
|
||||
@@ -226,7 +226,7 @@ mod tests {
|
||||
async fn poll_active_db_write_hook() {
|
||||
let mut controller = EngineHooksController::new(EngineHooks::new());
|
||||
|
||||
let context = EngineContext { tip_block_number: 2, finalized_block_number: Some(1) };
|
||||
let context = EngineHookContext { tip_block_number: 2, finalized_block_number: Some(1) };
|
||||
|
||||
// No currently running hook with DB write access is set
|
||||
let result = poll!(poll_fn(|cx| controller.poll_active_db_write_hook(cx, context)));
|
||||
@@ -277,7 +277,7 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn poll_next_hook_db_write_active() {
|
||||
let context = EngineContext { tip_block_number: 2, finalized_block_number: Some(1) };
|
||||
let context = EngineHookContext { tip_block_number: 2, finalized_block_number: Some(1) };
|
||||
|
||||
let mut hook_rw = TestHook::new_rw("read-write");
|
||||
hook_rw.add_result(Ok(EngineHookEvent::Started));
|
||||
@@ -311,7 +311,7 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn poll_next_hook_db_write_inactive() {
|
||||
let context = EngineContext { tip_block_number: 2, finalized_block_number: Some(1) };
|
||||
let context = EngineHookContext { tip_block_number: 2, finalized_block_number: Some(1) };
|
||||
|
||||
let hook_rw_1_name = "read-write-1";
|
||||
let mut hook_rw_1 = TestHook::new_rw(hook_rw_1_name);
|
||||
|
||||
@@ -48,7 +48,7 @@ pub trait EngineHook: Send + Sync + 'static {
|
||||
fn poll(
|
||||
&mut self,
|
||||
cx: &mut Context<'_>,
|
||||
ctx: EngineContext,
|
||||
ctx: EngineHookContext,
|
||||
) -> Poll<RethResult<EngineHookEvent>>;
|
||||
|
||||
/// Returns [db access level][`EngineHookDBAccessLevel`] the hook needs.
|
||||
@@ -57,7 +57,7 @@ pub trait EngineHook: Send + Sync + 'static {
|
||||
|
||||
/// Engine context passed to the [hook polling function][`EngineHook::poll`].
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
pub struct EngineContext {
|
||||
pub struct EngineHookContext {
|
||||
/// Tip block number.
|
||||
pub tip_block_number: BlockNumber,
|
||||
/// Finalized block number, if known.
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
//! Prune hook for the engine implementation.
|
||||
|
||||
use crate::{
|
||||
engine::hooks::{EngineContext, EngineHook, EngineHookError, EngineHookEvent},
|
||||
engine::hooks::{EngineHook, EngineHookContext, EngineHookError, EngineHookEvent},
|
||||
hooks::EngineHookDBAccessLevel,
|
||||
};
|
||||
use futures::FutureExt;
|
||||
@@ -121,7 +121,7 @@ impl<DB: Database + 'static> EngineHook for PruneHook<DB> {
|
||||
fn poll(
|
||||
&mut self,
|
||||
cx: &mut Context<'_>,
|
||||
ctx: EngineContext,
|
||||
ctx: EngineHookContext,
|
||||
) -> Poll<RethResult<EngineHookEvent>> {
|
||||
// Try to spawn a pruner
|
||||
match self.try_spawn_pruner(ctx.tip_block_number) {
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
//! StaticFile hook for the engine implementation.
|
||||
|
||||
use crate::{
|
||||
engine::hooks::{EngineContext, EngineHook, EngineHookError, EngineHookEvent},
|
||||
engine::hooks::{EngineHook, EngineHookContext, EngineHookError, EngineHookEvent},
|
||||
hooks::EngineHookDBAccessLevel,
|
||||
};
|
||||
use futures::FutureExt;
|
||||
@@ -135,7 +135,7 @@ impl<DB: Database + 'static> EngineHook for StaticFileHook<DB> {
|
||||
fn poll(
|
||||
&mut self,
|
||||
cx: &mut Context<'_>,
|
||||
ctx: EngineContext,
|
||||
ctx: EngineHookContext,
|
||||
) -> Poll<RethResult<EngineHookEvent>> {
|
||||
let Some(finalized_block_number) = ctx.finalized_block_number else {
|
||||
trace!(target: "consensus::engine::hooks::static_file", ?ctx, "Finalized block number is not available");
|
||||
|
||||
@@ -4,7 +4,7 @@ use crate::{
|
||||
message::OnForkChoiceUpdated,
|
||||
metrics::EngineMetrics,
|
||||
},
|
||||
hooks::{EngineContext, EngineHooksController},
|
||||
hooks::{EngineHookContext, EngineHooksController},
|
||||
sync::{EngineSyncController, EngineSyncEvent},
|
||||
};
|
||||
use futures::{Future, StreamExt};
|
||||
@@ -326,6 +326,17 @@ where
|
||||
Ok((this, handle))
|
||||
}
|
||||
|
||||
/// Returns current [EngineHookContext] that's used for polling engine hooks.
|
||||
fn current_engine_hook_context(&self) -> RethResult<EngineHookContext> {
|
||||
Ok(EngineHookContext {
|
||||
tip_block_number: self.blockchain.canonical_tip().number,
|
||||
finalized_block_number: self
|
||||
.blockchain
|
||||
.finalized_block_number()
|
||||
.map_err(RethError::Provider)?,
|
||||
})
|
||||
}
|
||||
|
||||
/// Called to resolve chain forks and ensure that the Execution layer is working with the latest
|
||||
/// valid chain.
|
||||
///
|
||||
@@ -1831,16 +1842,9 @@ where
|
||||
loop {
|
||||
// Poll a running hook with db write access first, as we will not be able to process
|
||||
// any engine messages until it's finished.
|
||||
if let Poll::Ready(result) = this.hooks.poll_active_db_write_hook(
|
||||
cx,
|
||||
EngineContext {
|
||||
tip_block_number: this.blockchain.canonical_tip().number,
|
||||
finalized_block_number: this
|
||||
.blockchain
|
||||
.finalized_block_number()
|
||||
.map_err(RethError::Provider)?,
|
||||
},
|
||||
)? {
|
||||
if let Poll::Ready(result) =
|
||||
this.hooks.poll_active_db_write_hook(cx, this.current_engine_hook_context()?)?
|
||||
{
|
||||
this.on_hook_result(result)?;
|
||||
continue
|
||||
}
|
||||
@@ -1906,13 +1910,7 @@ where
|
||||
if !this.forkchoice_state_tracker.is_latest_invalid() {
|
||||
if let Poll::Ready(result) = this.hooks.poll_next_hook(
|
||||
cx,
|
||||
EngineContext {
|
||||
tip_block_number: this.blockchain.canonical_tip().number,
|
||||
finalized_block_number: this
|
||||
.blockchain
|
||||
.finalized_block_number()
|
||||
.map_err(RethError::Provider)?,
|
||||
},
|
||||
this.current_engine_hook_context()?,
|
||||
this.sync.is_pipeline_active(),
|
||||
)? {
|
||||
this.on_hook_result(result)?;
|
||||
|
||||
Reference in New Issue
Block a user