diff --git a/crates/exex/exex/src/notifications.rs b/crates/exex/exex/src/notifications.rs
index 3d5c4a3500..369a0586c0 100644
--- a/crates/exex/exex/src/notifications.rs
+++ b/crates/exex/exex/src/notifications.rs
@@ -1,6 +1,4 @@
use crate::{BackfillJobFactory, ExExNotification, StreamBackfillJob, WalHandle};
-use alloy_primitives::U256;
-use eyre::OptionExt;
use futures::{Stream, StreamExt};
use reth_chainspec::Head;
use reth_evm::execute::BlockExecutorProvider;
@@ -137,15 +135,16 @@ pub struct ExExNotificationsWithHead
{
provider: P,
executor: E,
notifications: Receiver,
- #[allow(dead_code)]
wal_handle: WalHandle,
exex_head: ExExHead,
- pending_sync: bool,
+ /// If true, then we need to check if the ExEx head is on the canonical chain and if not,
+ /// revert its head.
+ pending_check_canonical: bool,
+ /// If true, then we need to check if the ExEx head is behind the node head and if so, backfill
+ /// the missing blocks.
+ pending_check_backfill: bool,
/// The backfill job to run before consuming any notifications.
backfill_job: Option>,
- /// Whether we're currently waiting for the node head to catch up to the same height as the
- /// ExEx head.
- node_head_catchup_in_progress: bool,
}
impl ExExNotificationsWithHead
@@ -169,90 +168,76 @@ where
notifications,
wal_handle,
exex_head,
- pending_sync: true,
+ pending_check_canonical: true,
+ pending_check_backfill: true,
backfill_job: None,
- node_head_catchup_in_progress: false,
}
}
- /// Compares the node head against the ExEx head, and synchronizes them in case of a mismatch.
+ /// Checks if the ExEx head is on the canonical chain.
+ ///
+ /// If the head block is not found in the database, it means we're not on the canonical chain
+ /// and we need to revert the notification with the ExEx head block.
+ fn check_canonical(&mut self) -> eyre::Result> {
+ if self.provider.header(&self.exex_head.block.hash)?.is_some() {
+ debug!(target: "exex::notifications", "ExEx head is on the canonical chain");
+ return Ok(None)
+ }
+
+ // If the head block is not found in the database, it means we're not on the canonical
+ // chain.
+
+ // Get the committed notification for the head block from the WAL.
+ let Some(notification) =
+ self.wal_handle.get_committed_notification_by_block_hash(&self.exex_head.block.hash)?
+ else {
+ return Err(eyre::eyre!(
+ "Could not find notification for block hash {:?} in the WAL",
+ self.exex_head.block.hash
+ ))
+ };
+
+ // Update the head block hash to the parent hash of the first committed block.
+ let committed_chain = notification.committed_chain().unwrap();
+ let new_exex_head =
+ (committed_chain.first().parent_hash, committed_chain.first().number - 1).into();
+ debug!(target: "exex::notifications", old_exex_head = ?self.exex_head.block, new_exex_head = ?new_exex_head, "ExEx head updated");
+ self.exex_head.block = new_exex_head;
+
+ // Return an inverted notification. See the documentation for
+ // `ExExNotification::into_inverted`.
+ Ok(Some(notification.into_inverted()))
+ }
+
+ /// Compares the node head against the ExEx head, and backfills if needed.
+ ///
+ /// CAUTON: This method assumes that the ExEx head is <= the node head, and that it's on the
+ /// canonical chain.
///
/// Possible situations are:
- /// - ExEx is behind the node head (`node_head.number < exex_head.number`).
- /// - ExEx is on the canonical chain (`exex_head.hash` is found in the node database).
- /// Backfill from the node database.
- /// - ExEx is not on the canonical chain (`exex_head.hash` is not found in the node database).
- /// Unwind the ExEx to the first block matching between the ExEx and the node, and then
- /// bacfkill from the node database.
- /// - ExEx is at the same block number (`node_head.number == exex_head.number`).
- /// - ExEx is on the canonical chain (`exex_head.hash` is found in the node database). Nothing
- /// to do.
- /// - ExEx is not on the canonical chain (`exex_head.hash` is not found in the node database).
- /// Unwind the ExEx to the first block matching between the ExEx and the node, and then
- /// backfill from the node database.
- /// - ExEx is ahead of the node head (`node_head.number > exex_head.number`). Wait until the
- /// node head catches up to the ExEx head, and then repeat the synchronization process.
- fn synchronize(&mut self) -> eyre::Result<()> {
+ /// - ExEx is behind the node head (`node_head.number < exex_head.number`). Backfill from the
+ /// node database.
+ /// - ExEx is at the same block number as the node head (`node_head.number ==
+ /// exex_head.number`). Nothing to do.
+ fn check_backfill(&mut self) -> eyre::Result<()> {
debug!(target: "exex::manager", "Synchronizing ExEx head");
let backfill_job_factory =
BackfillJobFactory::new(self.executor.clone(), self.provider.clone());
match self.exex_head.block.number.cmp(&self.node_head.number) {
std::cmp::Ordering::Less => {
- // ExEx is behind the node head
-
- if let Some(exex_header) = self.provider.header(&self.exex_head.block.hash)? {
- // ExEx is on the canonical chain
- debug!(target: "exex::manager", "ExEx is behind the node head and on the canonical chain");
-
- if exex_header.number != self.exex_head.block.number {
- eyre::bail!("ExEx head number does not match the hash")
- }
-
- // ExEx is on the canonical chain, start backfill
- let backfill = backfill_job_factory
- .backfill(self.exex_head.block.number + 1..=self.node_head.number)
- .into_stream();
- self.backfill_job = Some(backfill);
- } else {
- debug!(target: "exex::manager", "ExEx is behind the node head and not on the canonical chain");
- // ExEx is not on the canonical chain, first unwind it and then backfill
-
- // TODO(alexey): unwind and backfill
- self.backfill_job = None;
- }
+ // ExEx is behind the node head, start backfill
+ debug!(target: "exex::manager", "ExEx is behind the node head and on the canonical chain, starting backfill");
+ let backfill = backfill_job_factory
+ .backfill(self.exex_head.block.number + 1..=self.node_head.number)
+ .into_stream();
+ self.backfill_job = Some(backfill);
}
- #[allow(clippy::branches_sharing_code)]
std::cmp::Ordering::Equal => {
- // ExEx is at the same block height as the node head
-
- if let Some(exex_header) = self.provider.header(&self.exex_head.block.hash)? {
- // ExEx is on the canonical chain
- debug!(target: "exex::manager", "ExEx is at the same block height as the node head and on the canonical chain");
-
- if exex_header.number != self.exex_head.block.number {
- eyre::bail!("ExEx head number does not match the hash")
- }
-
- // ExEx is on the canonical chain and the same as the node head, no need to
- // backfill
- self.backfill_job = None;
- } else {
- // ExEx is not on the canonical chain, first unwind it and then backfill
- debug!(target: "exex::manager", "ExEx is at the same block height as the node head but not on the canonical chain");
-
- // TODO(alexey): unwind and backfill
- self.backfill_job = None;
- }
+ debug!(target: "exex::manager", "ExEx is at the node head");
}
std::cmp::Ordering::Greater => {
- debug!(target: "exex::manager", "ExEx is ahead of the node head");
-
- // ExEx is ahead of the node head
-
- // TODO(alexey): wait until the node head is at the same height as the ExEx head
- // and then repeat the process above
- self.node_head_catchup_in_progress = true;
+ return Err(eyre::eyre!("ExEx is ahead of the node head"))
}
};
@@ -270,9 +255,18 @@ where
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll > {
let this = self.get_mut();
- if this.pending_sync {
- this.synchronize()?;
- this.pending_sync = false;
+ if this.pending_check_canonical {
+ if let Some(canonical_notification) = this.check_canonical()? {
+ return Poll::Ready(Some(Ok(canonical_notification)))
+ }
+
+ // ExEx head is on the canonical chain, we no longer need to check it
+ this.pending_check_canonical = false;
+ }
+
+ if this.pending_check_backfill {
+ this.check_backfill()?;
+ this.pending_check_backfill = false;
}
if let Some(backfill_job) = &mut this.backfill_job {
@@ -286,71 +280,23 @@ where
this.backfill_job = None;
}
- loop {
- let Some(notification) = ready!(this.notifications.poll_recv(cx)) else {
- return Poll::Ready(None)
- };
+ let Some(notification) = ready!(this.notifications.poll_recv(cx)) else {
+ return Poll::Ready(None)
+ };
- // 1. Either committed or reverted chain from the notification.
- // 2. Block number of the tip of the canonical chain:
- // - For committed chain, it's the tip block number.
- // - For reverted chain, it's the block number preceding the first block in the chain.
- let (chain, tip) = notification
- .committed_chain()
- .map(|chain| (chain.clone(), chain.tip().number))
- .or_else(|| {
- notification
- .reverted_chain()
- .map(|chain| (chain.clone(), chain.first().number - 1))
- })
- .unzip();
-
- if this.node_head_catchup_in_progress {
- // If we are waiting for the node head to catch up to the same height as the ExEx
- // head, then we need to check if the ExEx is on the canonical chain.
-
- // Query the chain from the new notification for the ExEx head block number.
- let exex_head_block = chain
- .as_ref()
- .and_then(|chain| chain.blocks().get(&this.exex_head.block.number));
-
- // Compare the hash of the block from the new notification to the ExEx head
- // hash.
- if let Some((block, tip)) = exex_head_block.zip(tip) {
- if block.hash() == this.exex_head.block.hash {
- // ExEx is on the canonical chain, proceed with the notification
- this.node_head_catchup_in_progress = false;
- } else {
- // ExEx is not on the canonical chain, synchronize
- let tip =
- this.provider.sealed_header(tip)?.ok_or_eyre("node head not found")?;
- this.node_head = Head::new(
- tip.number,
- tip.hash(),
- tip.difficulty,
- U256::MAX,
- tip.timestamp,
- );
- this.synchronize()?;
- }
- }
- }
-
- if notification
- .committed_chain()
- .or_else(|| notification.reverted_chain())
- .map_or(false, |chain| chain.first().number > this.exex_head.block.number)
- {
- return Poll::Ready(Some(Ok(notification)))
- }
+ if let Some(committed_chain) = notification.committed_chain() {
+ this.exex_head.block = committed_chain.tip().num_hash();
+ } else if let Some(reverted_chain) = notification.reverted_chain() {
+ let first_block = reverted_chain.first();
+ this.exex_head.block = (first_block.parent_hash, first_block.number - 1).into();
}
+
+ Poll::Ready(Some(Ok(notification)))
}
}
#[cfg(test)]
mod tests {
- use std::future::poll_fn;
-
use crate::Wal;
use super::*;
@@ -363,7 +309,7 @@ mod tests {
use reth_primitives::Block;
use reth_provider::{
providers::BlockchainProvider2, test_utils::create_test_provider_factory, BlockWriter,
- Chain,
+ Chain, DatabaseProviderFactory,
};
use reth_testing_utils::generators::{self, random_block, BlockParams};
use tokio::sync::mpsc;
@@ -451,12 +397,6 @@ mod tests {
Ok(())
}
- #[ignore]
- #[tokio::test]
- async fn exex_notifications_behind_head_non_canonical() -> eyre::Result<()> {
- Ok(())
- }
-
#[tokio::test]
async fn exex_notifications_same_head_canonical() -> eyre::Result<()> {
let temp_dir = tempfile::tempdir().unwrap();
@@ -512,18 +452,112 @@ mod tests {
Ok(())
}
- #[ignore]
#[tokio::test]
async fn exex_notifications_same_head_non_canonical() -> eyre::Result<()> {
+ let mut rng = generators::rng();
+
+ let temp_dir = tempfile::tempdir().unwrap();
+ let mut wal = Wal::new(temp_dir.path()).unwrap();
+
+ let provider_factory = create_test_provider_factory();
+ let genesis_hash = init_genesis(&provider_factory)?;
+ let genesis_block = provider_factory
+ .block(genesis_hash.into())?
+ .ok_or_else(|| eyre::eyre!("genesis block not found"))?;
+
+ let provider = BlockchainProvider2::new(provider_factory)?;
+
+ let node_head_block = random_block(
+ &mut rng,
+ genesis_block.number + 1,
+ BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
+ )
+ .seal_with_senders()
+ .ok_or_eyre("failed to recover senders")?;
+ let node_head = Head {
+ number: node_head_block.number,
+ hash: node_head_block.hash(),
+ ..Default::default()
+ };
+ let provider_rw = provider.database_provider_rw()?;
+ provider_rw.insert_block(node_head_block)?;
+ provider_rw.commit()?;
+ let node_head_notification = ExExNotification::ChainCommitted {
+ new: Arc::new(
+ BackfillJobFactory::new(EthExecutorProvider::mainnet(), provider.clone())
+ .backfill(node_head.number..=node_head.number)
+ .next()
+ .ok_or_else(|| eyre::eyre!("failed to backfill"))??,
+ ),
+ };
+
+ let exex_head_block = random_block(
+ &mut rng,
+ genesis_block.number + 1,
+ BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
+ );
+ let exex_head = ExExHead { block: exex_head_block.num_hash() };
+ let exex_head_notification = ExExNotification::ChainCommitted {
+ new: Arc::new(Chain::new(
+ vec![exex_head_block
+ .clone()
+ .seal_with_senders()
+ .ok_or_eyre("failed to recover senders")?],
+ Default::default(),
+ None,
+ )),
+ };
+ wal.commit(&exex_head_notification)?;
+
+ let new_notification = ExExNotification::ChainCommitted {
+ new: Arc::new(Chain::new(
+ vec![random_block(
+ &mut rng,
+ node_head.number + 1,
+ BlockParams { parent: Some(node_head.hash), ..Default::default() },
+ )
+ .seal_with_senders()
+ .ok_or_eyre("failed to recover senders")?],
+ Default::default(),
+ None,
+ )),
+ };
+
+ let (notifications_tx, notifications_rx) = mpsc::channel(1);
+
+ notifications_tx.send(new_notification.clone()).await?;
+
+ let mut notifications = ExExNotifications::new(
+ node_head,
+ provider,
+ EthExecutorProvider::mainnet(),
+ notifications_rx,
+ wal.handle(),
+ )
+ .with_head(exex_head);
+
+ // First notification is the revert of the ExEx head block to get back to the canonical
+ // chain
+ assert_eq!(
+ notifications.next().await.transpose()?,
+ Some(exex_head_notification.into_inverted())
+ );
+ // Second notification is the backfilled block from the canonical chain to get back to the
+ // canonical tip
+ assert_eq!(notifications.next().await.transpose()?, Some(node_head_notification));
+ // Third notification is the actual notification that we sent before
+ assert_eq!(notifications.next().await.transpose()?, Some(new_notification));
+
Ok(())
}
#[tokio::test]
async fn test_notifications_ahead_of_head() -> eyre::Result<()> {
+ reth_tracing::init_test_tracing();
let mut rng = generators::rng();
let temp_dir = tempfile::tempdir().unwrap();
- let wal = Wal::new(temp_dir.path()).unwrap();
+ let mut wal = Wal::new(temp_dir.path()).unwrap();
let provider_factory = create_test_provider_factory();
let genesis_hash = init_genesis(&provider_factory)?;
@@ -538,6 +572,17 @@ mod tests {
genesis_block.number + 1,
BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() },
);
+ let exex_head_notification = ExExNotification::ChainCommitted {
+ new: Arc::new(Chain::new(
+ vec![exex_head_block
+ .clone()
+ .seal_with_senders()
+ .ok_or_eyre("failed to recover senders")?],
+ Default::default(),
+ None,
+ )),
+ };
+ wal.commit(&exex_head_notification)?;
let node_head =
Head { number: genesis_block.number, hash: genesis_hash, ..Default::default() };
@@ -545,20 +590,23 @@ mod tests {
block: BlockNumHash { number: exex_head_block.number, hash: exex_head_block.hash() },
};
+ let new_notification = ExExNotification::ChainCommitted {
+ new: Arc::new(Chain::new(
+ vec![random_block(
+ &mut rng,
+ genesis_block.number + 1,
+ BlockParams { parent: Some(genesis_hash), ..Default::default() },
+ )
+ .seal_with_senders()
+ .ok_or_eyre("failed to recover senders")?],
+ Default::default(),
+ None,
+ )),
+ };
+
let (notifications_tx, notifications_rx) = mpsc::channel(1);
- notifications_tx
- .send(ExExNotification::ChainCommitted {
- new: Arc::new(Chain::new(
- vec![exex_head_block
- .clone()
- .seal_with_senders()
- .ok_or_eyre("failed to recover senders")?],
- Default::default(),
- None,
- )),
- })
- .await?;
+ notifications_tx.send(new_notification.clone()).await?;
let mut notifications = ExExNotifications::new(
node_head,
@@ -569,29 +617,15 @@ mod tests {
)
.with_head(exex_head);
- // First notification is skipped because the node is catching up with the ExEx
- let new_notification = poll_fn(|cx| Poll::Ready(notifications.poll_next_unpin(cx))).await;
- assert!(new_notification.is_pending());
+ // First notification is the revert of the ExEx head block to get back to the canonical
+ // chain
+ assert_eq!(
+ notifications.next().await.transpose()?,
+ Some(exex_head_notification.into_inverted())
+ );
- // Imitate the node catching up with the ExEx by sending a notification for the missing
- // block
- let notification = ExExNotification::ChainCommitted {
- new: Arc::new(Chain::new(
- vec![random_block(
- &mut rng,
- exex_head_block.number + 1,
- BlockParams { parent: Some(exex_head_block.hash()), ..Default::default() },
- )
- .seal_with_senders()
- .ok_or_eyre("failed to recover senders")?],
- Default::default(),
- None,
- )),
- };
- notifications_tx.send(notification.clone()).await?;
-
- // Second notification is received because the node caught up with the ExEx
- assert_eq!(notifications.next().await.transpose()?, Some(notification));
+ // Second notification is the actual notification that we sent before
+ assert_eq!(notifications.next().await.transpose()?, Some(new_notification));
Ok(())
}
diff --git a/crates/exex/exex/src/wal/cache.rs b/crates/exex/exex/src/wal/cache.rs
index b3d29f1b39..cef27369eb 100644
--- a/crates/exex/exex/src/wal/cache.rs
+++ b/crates/exex/exex/src/wal/cache.rs
@@ -18,17 +18,17 @@ pub struct BlockCache {
/// the cache with the same file ID. I.e. for each notification, there may be multiple blocks
/// in the cache.
files: RwLock>>,
- /// A mapping of `Block Hash -> Block`.
+ /// A mapping of committed blocks `Block Hash -> Block`.
///
/// For each [`ExExNotification::ChainCommitted`] notification, there will be an entry per
/// block.
- blocks: DashMap,
+ committed_blocks: DashMap,
}
impl BlockCache {
/// Creates a new instance of [`BlockCache`].
pub(super) fn new() -> Self {
- Self { files: RwLock::new(BTreeMap::new()), blocks: DashMap::new() }
+ Self { files: RwLock::new(BTreeMap::new()), committed_blocks: DashMap::new() }
}
/// Returns `true` if the cache is empty.
@@ -95,6 +95,12 @@ impl BlockCache {
Some((key, last_block))
}
+ /// Returns the file ID for the notification containing the given committed block hash, if it
+ /// exists.
+ pub(super) fn get_file_id_by_committed_block_hash(&self, block_hash: &B256) -> Option {
+ self.committed_blocks.get(block_hash).map(|entry| entry.0)
+ }
+
/// Inserts the blocks from the notification into the cache with the given file ID.
///
/// First, inserts the reverted blocks (if any), then the committed blocks (if any).
@@ -126,7 +132,7 @@ impl BlockCache {
parent_hash: block.parent_hash,
};
files.entry(file_id).or_default().push_back(cached_block);
- self.blocks.insert(block.hash(), cached_block);
+ self.committed_blocks.insert(block.hash(), (file_id, cached_block));
}
}
}
diff --git a/crates/exex/exex/src/wal/mod.rs b/crates/exex/exex/src/wal/mod.rs
index 5d28cb3e03..d7aea3aafd 100644
--- a/crates/exex/exex/src/wal/mod.rs
+++ b/crates/exex/exex/src/wal/mod.rs
@@ -3,12 +3,14 @@
mod cache;
pub use cache::BlockCache;
mod storage;
+use eyre::OptionExt;
pub use storage::Storage;
use std::{path::Path, sync::Arc};
use alloy_eips::BlockNumHash;
use reth_exex_types::ExExNotification;
+use reth_primitives::B256;
use reth_tracing::tracing::{debug, instrument};
/// WAL is a write-ahead log (WAL) that stores the notifications sent to ExExes.
@@ -135,7 +137,10 @@ impl WalInner {
block.block.number == to_block.number &&
block.block.hash == to_block.hash
{
- let notification = self.storage.read_notification(file_id)?;
+ let notification = self
+ .storage
+ .read_notification(file_id)?
+ .ok_or_eyre("notification not found")?;
if notification.committed_chain().unwrap().blocks().len() == 1 {
unfinalized_from_file_id = Some(
block_cache.peek().map(|(file_id, _)| *file_id).unwrap_or(u64::MAX),
@@ -207,6 +212,21 @@ pub struct WalHandle {
wal: Arc,
}
+impl WalHandle {
+ /// Returns the notification for the given committed block hash if it exists.
+ pub fn get_committed_notification_by_block_hash(
+ &self,
+ block_hash: &B256,
+ ) -> eyre::Result> {
+ let Some(file_id) = self.wal.block_cache.get_file_id_by_committed_block_hash(block_hash)
+ else {
+ return Ok(None)
+ };
+
+ self.wal.storage.read_notification(file_id)
+ }
+}
+
#[cfg(test)]
mod tests {
use std::sync::Arc;
diff --git a/crates/exex/exex/src/wal/storage.rs b/crates/exex/exex/src/wal/storage.rs
index 8953a6a4ed..817d57d193 100644
--- a/crates/exex/exex/src/wal/storage.rs
+++ b/crates/exex/exex/src/wal/storage.rs
@@ -84,16 +84,25 @@ impl Storage {
&self,
range: RangeInclusive,
) -> impl Iterator- > + '_ {
- range.map(move |id| self.read_notification(id).map(|notification| (id, notification)))
+ range.map(move |id| {
+ let notification = self.read_notification(id)?.ok_or_eyre("notification not found")?;
+
+ Ok((id, notification))
+ })
}
/// Reads the notification from the file with the given id.
#[instrument(target = "exex::wal::storage", skip(self))]
- pub(super) fn read_notification(&self, file_id: u64) -> eyre::Result
{
+ pub(super) fn read_notification(&self, file_id: u64) -> eyre::Result> {
let file_path = self.file_path(file_id);
debug!(?file_path, "Reading notification from WAL");
- let mut file = File::open(&file_path)?;
+ let mut file = match File::open(&file_path) {
+ Ok(file) => file,
+ Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None),
+ Err(err) => return Err(err.into()),
+ };
+
// TODO(alexey): use rmp-serde when Alloy and Reth serde issues are resolved
Ok(serde_json::from_reader(&mut file)?)
}
@@ -149,7 +158,7 @@ mod tests {
let file_id = 0;
storage.write_notification(file_id, ¬ification)?;
let deserialized_notification = storage.read_notification(file_id)?;
- assert_eq!(deserialized_notification, notification);
+ assert_eq!(deserialized_notification, Some(notification));
Ok(())
}