diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs
index 31dc822222..8c1518f309 100644
--- a/crates/exex/exex/src/manager.rs
+++ b/crates/exex/exex/src/manager.rs
@@ -34,6 +34,18 @@ use tokio_util::sync::{PollSendError, PollSender, ReusableBoxFuture};
/// or 17 minutes of 1-second blocks.
pub const DEFAULT_EXEX_MANAGER_CAPACITY: usize = 1024;
+/// The source of the notification.
+///
+/// This distinguishment is needed to not commit any pipeline notificatations to [WAL](`Wal`),
+/// because they are already finalized.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum ExExNotificationSource {
+ /// The notification was sent from the pipeline.
+ Pipeline,
+ /// The notification was sent from the blockchain tree.
+ BlockchainTree,
+}
+
/// Metrics for an `ExEx`.
#[derive(Metrics)]
#[metrics(scope = "exex")]
@@ -197,7 +209,7 @@ pub struct ExExManager
{
exex_handles: Vec,
/// [`ExExNotification`] channel from the [`ExExManagerHandle`]s.
- handle_rx: UnboundedReceiver,
+ handle_rx: UnboundedReceiver<(ExExNotificationSource, ExExNotification)>,
/// The minimum notification ID currently present in the buffer.
min_id: usize,
@@ -429,14 +441,23 @@ where
// Drain handle notifications
while this.buffer.len() < this.max_capacity {
- if let Poll::Ready(Some(notification)) = this.handle_rx.poll_recv(cx) {
- debug!(
- target: "exex::manager",
- committed_tip = ?notification.committed_chain().map(|chain| chain.tip().number),
- reverted_tip = ?notification.reverted_chain().map(|chain| chain.tip().number),
- "Received new notification"
- );
- this.wal.commit(¬ification)?;
+ if let Poll::Ready(Some((source, notification))) = this.handle_rx.poll_recv(cx) {
+ let committed_tip = notification.committed_chain().map(|chain| chain.tip().number);
+ let reverted_tip = notification.reverted_chain().map(|chain| chain.tip().number);
+ debug!(target: "exex::manager", ?committed_tip, ?reverted_tip, "Received new notification");
+
+ // Commit to WAL only notifications from blockchain tree. Pipeline notifications
+ // always contain only finalized blocks.
+ match source {
+ ExExNotificationSource::BlockchainTree => {
+ debug!(target: "exex::manager", ?committed_tip, ?reverted_tip, "Committing notification to WAL");
+ this.wal.commit(¬ification)?;
+ }
+ ExExNotificationSource::Pipeline => {
+ debug!(target: "exex::manager", ?committed_tip, ?reverted_tip, "Notification was sent from pipeline, skipping WAL commit");
+ }
+ }
+
this.push_notification(notification);
continue
}
@@ -491,7 +512,7 @@ where
#[derive(Debug)]
pub struct ExExManagerHandle {
/// Channel to send notifications to the `ExEx` manager.
- exex_tx: UnboundedSender,
+ exex_tx: UnboundedSender<(ExExNotificationSource, ExExNotification)>,
/// The number of `ExEx`'s running on the node.
num_exexs: usize,
/// A watch channel denoting whether the manager is ready for new notifications or not.
@@ -533,8 +554,12 @@ impl ExExManagerHandle {
/// Synchronously send a notification over the channel to all execution extensions.
///
/// Senders should call [`Self::has_capacity`] first.
- pub fn send(&self, notification: ExExNotification) -> Result<(), SendError> {
- self.exex_tx.send(notification)
+ pub fn send(
+ &self,
+ source: ExExNotificationSource,
+ notification: ExExNotification,
+ ) -> Result<(), SendError<(ExExNotificationSource, ExExNotification)>> {
+ self.exex_tx.send((source, notification))
}
/// Asynchronously send a notification over the channel to all execution extensions.
@@ -543,10 +568,11 @@ impl ExExManagerHandle {
/// capacity in the channel, the future will wait.
pub async fn send_async(
&mut self,
+ source: ExExNotificationSource,
notification: ExExNotification,
- ) -> Result<(), SendError> {
+ ) -> Result<(), SendError<(ExExNotificationSource, ExExNotification)>> {
self.ready().await;
- self.exex_tx.send(notification)
+ self.exex_tx.send((source, notification))
}
/// Get the current capacity of the `ExEx` manager's internal notification buffer.
@@ -610,16 +636,16 @@ impl Clone for ExExManagerHandle {
mod tests {
use super::*;
use alloy_primitives::B256;
- use futures::StreamExt;
+ use futures::{StreamExt, TryStreamExt};
use rand::Rng;
use reth_db_common::init::init_genesis;
use reth_evm_ethereum::execute::EthExecutorProvider;
use reth_primitives::SealedBlockWithSenders;
use reth_provider::{
providers::BlockchainProvider2, test_utils::create_test_provider_factory, BlockReader,
- Chain, TransactionVariant,
+ BlockWriter, Chain, DatabaseProviderFactory, TransactionVariant,
};
- use reth_testing_utils::generators;
+ use reth_testing_utils::generators::{self, random_block, BlockParams};
fn empty_finalized_header_stream() -> ForkChoiceStream {
let (tx, rx) = watch::channel(None);
@@ -959,9 +985,21 @@ mod tests {
};
// Send notifications to go over the max capacity
- exex_manager.handle.exex_tx.send(notification.clone()).unwrap();
- exex_manager.handle.exex_tx.send(notification.clone()).unwrap();
- exex_manager.handle.exex_tx.send(notification).unwrap();
+ exex_manager
+ .handle
+ .exex_tx
+ .send((ExExNotificationSource::BlockchainTree, notification.clone()))
+ .unwrap();
+ exex_manager
+ .handle
+ .exex_tx
+ .send((ExExNotificationSource::BlockchainTree, notification.clone()))
+ .unwrap();
+ exex_manager
+ .handle
+ .exex_tx
+ .send((ExExNotificationSource::BlockchainTree, notification))
+ .unwrap();
// Pin the ExExManager to call the poll method
let mut pinned_manager = std::pin::pin!(exex_manager);
@@ -1177,6 +1215,18 @@ mod tests {
.sealed_block_with_senders(genesis_hash.into(), TransactionVariant::NoHash)
.unwrap()
.ok_or_else(|| eyre::eyre!("genesis block not found"))?;
+
+ let block = random_block(
+ &mut rng,
+ genesis_block.number + 1,
+ BlockParams { parent: Some(genesis_hash), ..Default::default() },
+ )
+ .seal_with_senders()
+ .unwrap();
+ let provider_rw = provider_factory.database_provider_rw().unwrap();
+ provider_rw.insert_block(block.clone()).unwrap();
+ provider_rw.commit().unwrap();
+
let provider = BlockchainProvider2::new(provider_factory).unwrap();
let temp_dir = tempfile::tempdir().unwrap();
@@ -1190,33 +1240,49 @@ mod tests {
wal.handle(),
);
- let notification = ExExNotification::ChainCommitted {
+ let genesis_notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(vec![genesis_block.clone()], Default::default(), None)),
};
+ let notification = ExExNotification::ChainCommitted {
+ new: Arc::new(Chain::new(vec![block.clone()], Default::default(), None)),
+ };
let (finalized_headers_tx, rx) = watch::channel(None);
+ finalized_headers_tx.send(Some(genesis_block.header.clone()))?;
let finalized_header_stream = ForkChoiceStream::new(rx);
let mut exex_manager = std::pin::pin!(ExExManager::new(
provider,
vec![exex_handle],
- 1,
+ 2,
wal,
finalized_header_stream
));
let mut cx = Context::from_waker(futures::task::noop_waker_ref());
- exex_manager.handle().send(notification.clone())?;
+ exex_manager
+ .handle()
+ .send(ExExNotificationSource::Pipeline, genesis_notification.clone())?;
+ exex_manager.handle().send(ExExNotificationSource::BlockchainTree, notification.clone())?;
assert!(exex_manager.as_mut().poll(&mut cx)?.is_pending());
- assert_eq!(notifications.next().await.unwrap().unwrap(), notification.clone());
+ assert_eq!(
+ notifications.try_poll_next_unpin(&mut cx)?,
+ Poll::Ready(Some(genesis_notification))
+ );
+ assert!(exex_manager.as_mut().poll(&mut cx)?.is_pending());
+ assert_eq!(
+ notifications.try_poll_next_unpin(&mut cx)?,
+ Poll::Ready(Some(notification.clone()))
+ );
+ // WAL shouldn't contain the genesis notification, because it's finalized
assert_eq!(
exex_manager.wal.iter_notifications()?.collect::>>()?,
[notification.clone()]
);
- finalized_headers_tx.send(Some(genesis_block.header.clone()))?;
+ finalized_headers_tx.send(Some(block.header.clone()))?;
assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
// WAL isn't finalized because the ExEx didn't emit the `FinishedHeight` event
assert_eq!(
@@ -1229,7 +1295,7 @@ mod tests {
.send(ExExEvent::FinishedHeight((rng.gen::(), rng.gen::()).into()))
.unwrap();
- finalized_headers_tx.send(Some(genesis_block.header.clone()))?;
+ finalized_headers_tx.send(Some(block.header.clone()))?;
assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
// WAL isn't finalized because the ExEx emitted a `FinishedHeight` event with a
// non-canonical block
@@ -1239,12 +1305,12 @@ mod tests {
);
// Send a `FinishedHeight` event with a canonical block
- events_tx.send(ExExEvent::FinishedHeight(genesis_block.num_hash())).unwrap();
+ events_tx.send(ExExEvent::FinishedHeight(block.num_hash())).unwrap();
- finalized_headers_tx.send(Some(genesis_block.header.clone()))?;
+ finalized_headers_tx.send(Some(block.header.clone()))?;
assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
// WAL is finalized
- assert!(exex_manager.wal.iter_notifications()?.next().is_none());
+ assert_eq!(exex_manager.wal.iter_notifications()?.next().transpose()?, None);
Ok(())
}
diff --git a/crates/node/builder/src/launch/exex.rs b/crates/node/builder/src/launch/exex.rs
index 5441f393d3..a3640690c1 100644
--- a/crates/node/builder/src/launch/exex.rs
+++ b/crates/node/builder/src/launch/exex.rs
@@ -6,7 +6,8 @@ use futures::future;
use reth_chain_state::ForkChoiceSubscriptions;
use reth_chainspec::EthChainSpec;
use reth_exex::{
- ExExContext, ExExHandle, ExExManager, ExExManagerHandle, Wal, DEFAULT_EXEX_MANAGER_CAPACITY,
+ ExExContext, ExExHandle, ExExManager, ExExManagerHandle, ExExNotificationSource, Wal,
+ DEFAULT_EXEX_MANAGER_CAPACITY,
};
use reth_node_api::{FullNodeComponents, NodeTypes};
use reth_primitives::Head;
@@ -47,6 +48,7 @@ impl ExExLauncher {
return Ok(None)
}
+ info!(target: "reth::cli", "Loading ExEx Write-Ahead Log...");
let exex_wal = Wal::new(
config_container
.config
@@ -127,7 +129,7 @@ impl ExExLauncher {
async move {
while let Ok(notification) = canon_state_notifications.recv().await {
handle
- .send_async(notification.into())
+ .send_async(ExExNotificationSource::BlockchainTree, notification.into())
.await
.expect("blockchain tree notification could not be sent to exex manager");
}
diff --git a/crates/stages/stages/src/stages/execution.rs b/crates/stages/stages/src/stages/execution.rs
index df234e542b..7bb6ebc59e 100644
--- a/crates/stages/stages/src/stages/execution.rs
+++ b/crates/stages/stages/src/stages/execution.rs
@@ -9,7 +9,7 @@ use reth_evm::{
metrics::ExecutorMetrics,
};
use reth_execution_types::{Chain, ExecutionOutcome};
-use reth_exex::{ExExManagerHandle, ExExNotification};
+use reth_exex::{ExExManagerHandle, ExExNotification, ExExNotificationSource};
use reth_primitives::{Header, SealedHeader, StaticFileSegment};
use reth_primitives_traits::format_gas_throughput;
use reth_provider::{
@@ -389,9 +389,10 @@ where
// NOTE: We can ignore the error here, since an error means that the channel is closed,
// which means the manager has died, which then in turn means the node is shutting down.
- let _ = self
- .exex_manager_handle
- .send(ExExNotification::ChainCommitted { new: Arc::new(chain) });
+ let _ = self.exex_manager_handle.send(
+ ExExNotificationSource::Pipeline,
+ ExExNotification::ChainCommitted { new: Arc::new(chain) },
+ );
Ok(())
}
@@ -477,8 +478,10 @@ where
// NOTE: We can ignore the error here, since an error means that the channel is closed,
// which means the manager has died, which then in turn means the node is shutting down.
- let _ =
- self.exex_manager_handle.send(ExExNotification::ChainReverted { old: Arc::new(chain) });
+ let _ = self.exex_manager_handle.send(
+ ExExNotificationSource::Pipeline,
+ ExExNotification::ChainReverted { old: Arc::new(chain) },
+ );
Ok(())
}