feat(stages): send a notification to ExExes only when Execution stage commits (#8720)

This commit is contained in:
Alexey Shekhirin
2024-06-11 01:54:52 +01:00
committed by GitHub
parent 7a69941e86
commit 79113551ee
4 changed files with 86 additions and 23 deletions

View File

@@ -71,7 +71,7 @@ impl<Node: FullNodeComponents + Clone> ExExLauncher<Node> {
executor.spawn_critical("exex", async move {
info!(target: "reth::cli", "ExEx started");
match exex.await {
Ok(_) => panic!("ExEx {id} finished. ExEx's should run indefinitely"),
Ok(_) => panic!("ExEx {id} finished. ExExes should run indefinitely"),
Err(err) => panic!("ExEx {id} crashed: {err}"),
}
});

View File

@@ -369,6 +369,8 @@ where
provider_rw.commit()?;
self.provider_factory.static_file_provider().commit()?;
stage.post_unwind_commit()?;
provider_rw = self.provider_factory.provider_rw()?;
}
Err(err) => {
@@ -479,6 +481,8 @@ where
self.provider_factory.static_file_provider().commit()?;
provider_rw.commit()?;
stage.post_execute_commit()?;
if done {
let block_number = checkpoint.block_number;
return Ok(if made_progress {

View File

@@ -238,12 +238,30 @@ pub trait Stage<DB: Database>: Send + Sync {
input: ExecInput,
) -> Result<ExecOutput, StageError>;
/// Post execution commit hook.
///
/// This is called after the stage has been executed and the data has been committed by the
/// provider. The stage may want to pass some data from [`Self::execute`] via the internal
/// field.
fn post_execute_commit(&mut self) -> Result<(), StageError> {
Ok(())
}
/// Unwind the stage.
fn unwind(
&mut self,
provider: &DatabaseProviderRW<DB>,
input: UnwindInput,
) -> Result<UnwindOutput, StageError>;
/// Post unwind commit hook.
///
/// This is called after the stage has been unwound and the data has been committed by the
/// provider. The stage may want to pass some data from [`Self::unwind`] via the internal
/// field.
fn post_unwind_commit(&mut self) -> Result<(), StageError> {
Ok(())
}
}
/// [Stage] trait extension.

View File

@@ -75,6 +75,14 @@ pub struct ExecutionStage<E> {
external_clean_threshold: u64,
/// Pruning configuration.
prune_modes: PruneModes,
/// Input for the post execute commit hook.
/// Set after every [`ExecutionStage::execute`] and cleared after
/// [`ExecutionStage::post_execute_commit`].
post_execute_commit_input: Option<Chain>,
/// Input for the post unwind commit hook.
/// Set after every [`ExecutionStage::unwind`] and cleared after
/// [`ExecutionStage::post_unwind_commit`].
post_unwind_commit_input: Option<Chain>,
/// Handle to communicate with `ExEx` manager.
exex_manager_handle: ExExManagerHandle,
}
@@ -94,6 +102,8 @@ impl<E> ExecutionStage<E> {
executor_provider,
thresholds,
prune_modes,
post_execute_commit_input: None,
post_unwind_commit_input: None,
exex_manager_handle,
}
}
@@ -272,7 +282,7 @@ where
stage_progress = block_number;
stage_checkpoint.progress.processed += block.gas_used;
// If we have ExEx's we need to save the block in memory for later
// If we have ExExes we need to save the block in memory for later
if self.exex_manager_handle.has_exexs() {
blocks.push(block);
}
@@ -294,23 +304,25 @@ where
let state = BundleStateWithReceipts::new(bundle, receipts, first_block, requests);
let write_preparation_duration = time.elapsed();
// Check if we should send a [`ExExNotification`] to execution extensions.
// Prepare the input for post execute commit hook, where an `ExExNotification` will be sent.
//
// Note: Since we only write to `blocks` if there are any ExEx's we don't need to perform
// Note: Since we only write to `blocks` if there are any ExExes, we don't need to perform
// the `has_exexs` check here as well
if !blocks.is_empty() {
let chain = Arc::new(Chain::new(
blocks.into_iter().map(|block| {
let hash = block.header.hash_slow();
block.seal(hash)
}),
state.clone(),
None,
));
let blocks = blocks.into_iter().map(|block| {
let hash = block.header.hash_slow();
block.seal(hash)
});
// NOTE: We can ignore the error here, since an error means that the channel is closed,
// which means the manager has died, which then in turn means the node is shutting down.
let _ = self.exex_manager_handle.send(ExExNotification::ChainCommitted { new: chain });
let previous_input =
self.post_execute_commit_input.replace(Chain::new(blocks, state.clone(), None));
debug_assert!(
previous_input.is_none(),
"Previous post execute commit input wasn't processed"
);
if let Some(previous_input) = previous_input {
tracing::debug!(target: "sync::stages::execution", ?previous_input, "Previous post execute commit input wasn't processed");
}
}
let time = Instant::now();
@@ -338,6 +350,18 @@ where
})
}
fn post_execute_commit(&mut self) -> Result<(), StageError> {
let Some(chain) = self.post_execute_commit_input.take() else { return Ok(()) };
// NOTE: We can ignore the error here, since an error means that the channel is closed,
// which means the manager has died, which then in turn means the node is shutting down.
let _ = self
.exex_manager_handle
.send(ExExNotification::ChainCommitted { new: Arc::new(chain) });
Ok(())
}
/// Unwind the stage.
fn unwind(
&mut self,
@@ -357,17 +381,23 @@ where
// This also updates `PlainStorageState` and `PlainAccountState`.
let bundle_state_with_receipts = provider.unwind_or_peek_state::<true>(range.clone())?;
// Construct a `ExExNotification` if we have ExEx's installed.
// Prepare the input for post unwind commit hook, where an `ExExNotification` will be sent.
if self.exex_manager_handle.has_exexs() {
// Get the blocks for the unwound range. This is needed for `ExExNotification`.
// Get the blocks for the unwound range.
let blocks = provider.get_take_block_range::<false>(range.clone())?;
let chain = Chain::new(blocks, bundle_state_with_receipts, None);
let previous_input = self.post_unwind_commit_input.replace(Chain::new(
blocks,
bundle_state_with_receipts,
None,
));
// NOTE: We can ignore the error here, since an error means that the channel is closed,
// which means the manager has died, which then in turn means the node is shutting down.
let _ = self
.exex_manager_handle
.send(ExExNotification::ChainReverted { old: Arc::new(chain) });
debug_assert!(
previous_input.is_none(),
"Previous post unwind commit input wasn't processed"
);
if let Some(previous_input) = previous_input {
tracing::debug!(target: "sync::stages::execution", ?previous_input, "Previous post unwind commit input wasn't processed");
}
}
// Unwind all receipts for transactions in the block range
@@ -404,6 +434,17 @@ where
Ok(UnwindOutput { checkpoint })
}
fn post_unwind_commit(&mut self) -> Result<(), StageError> {
let Some(chain) = self.post_unwind_commit_input.take() else { return Ok(()) };
// NOTE: We can ignore the error here, since an error means that the channel is closed,
// which means the manager has died, which then in turn means the node is shutting down.
let _ =
self.exex_manager_handle.send(ExExNotification::ChainReverted { old: Arc::new(chain) });
Ok(())
}
}
fn execution_checkpoint(