From dbd9a2bb35ba8fbcbaf9ec74e63efdee4a087d3f Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Fri, 27 Sep 2024 16:22:03 +0100 Subject: [PATCH] feat(exex): send canonical notifications when head is provided (#11280) --- crates/exex/exex/src/notifications.rs | 392 ++++++++++++++------------ crates/exex/exex/src/wal/cache.rs | 14 +- crates/exex/exex/src/wal/mod.rs | 22 +- crates/exex/exex/src/wal/storage.rs | 17 +- 4 files changed, 257 insertions(+), 188 deletions(-) diff --git a/crates/exex/exex/src/notifications.rs b/crates/exex/exex/src/notifications.rs index 3d5c4a3500..369a0586c0 100644 --- a/crates/exex/exex/src/notifications.rs +++ b/crates/exex/exex/src/notifications.rs @@ -1,6 +1,4 @@ use crate::{BackfillJobFactory, ExExNotification, StreamBackfillJob, WalHandle}; -use alloy_primitives::U256; -use eyre::OptionExt; use futures::{Stream, StreamExt}; use reth_chainspec::Head; use reth_evm::execute::BlockExecutorProvider; @@ -137,15 +135,16 @@ pub struct ExExNotificationsWithHead { provider: P, executor: E, notifications: Receiver, - #[allow(dead_code)] wal_handle: WalHandle, exex_head: ExExHead, - pending_sync: bool, + /// If true, then we need to check if the ExEx head is on the canonical chain and if not, + /// revert its head. + pending_check_canonical: bool, + /// If true, then we need to check if the ExEx head is behind the node head and if so, backfill + /// the missing blocks. + pending_check_backfill: bool, /// The backfill job to run before consuming any notifications. backfill_job: Option>, - /// Whether we're currently waiting for the node head to catch up to the same height as the - /// ExEx head. - node_head_catchup_in_progress: bool, } impl ExExNotificationsWithHead @@ -169,90 +168,76 @@ where notifications, wal_handle, exex_head, - pending_sync: true, + pending_check_canonical: true, + pending_check_backfill: true, backfill_job: None, - node_head_catchup_in_progress: false, } } - /// Compares the node head against the ExEx head, and synchronizes them in case of a mismatch. + /// Checks if the ExEx head is on the canonical chain. + /// + /// If the head block is not found in the database, it means we're not on the canonical chain + /// and we need to revert the notification with the ExEx head block. + fn check_canonical(&mut self) -> eyre::Result> { + if self.provider.header(&self.exex_head.block.hash)?.is_some() { + debug!(target: "exex::notifications", "ExEx head is on the canonical chain"); + return Ok(None) + } + + // If the head block is not found in the database, it means we're not on the canonical + // chain. + + // Get the committed notification for the head block from the WAL. + let Some(notification) = + self.wal_handle.get_committed_notification_by_block_hash(&self.exex_head.block.hash)? + else { + return Err(eyre::eyre!( + "Could not find notification for block hash {:?} in the WAL", + self.exex_head.block.hash + )) + }; + + // Update the head block hash to the parent hash of the first committed block. + let committed_chain = notification.committed_chain().unwrap(); + let new_exex_head = + (committed_chain.first().parent_hash, committed_chain.first().number - 1).into(); + debug!(target: "exex::notifications", old_exex_head = ?self.exex_head.block, new_exex_head = ?new_exex_head, "ExEx head updated"); + self.exex_head.block = new_exex_head; + + // Return an inverted notification. See the documentation for + // `ExExNotification::into_inverted`. + Ok(Some(notification.into_inverted())) + } + + /// Compares the node head against the ExEx head, and backfills if needed. + /// + /// CAUTON: This method assumes that the ExEx head is <= the node head, and that it's on the + /// canonical chain. /// /// Possible situations are: - /// - ExEx is behind the node head (`node_head.number < exex_head.number`). - /// - ExEx is on the canonical chain (`exex_head.hash` is found in the node database). - /// Backfill from the node database. - /// - ExEx is not on the canonical chain (`exex_head.hash` is not found in the node database). - /// Unwind the ExEx to the first block matching between the ExEx and the node, and then - /// bacfkill from the node database. - /// - ExEx is at the same block number (`node_head.number == exex_head.number`). - /// - ExEx is on the canonical chain (`exex_head.hash` is found in the node database). Nothing - /// to do. - /// - ExEx is not on the canonical chain (`exex_head.hash` is not found in the node database). - /// Unwind the ExEx to the first block matching between the ExEx and the node, and then - /// backfill from the node database. - /// - ExEx is ahead of the node head (`node_head.number > exex_head.number`). Wait until the - /// node head catches up to the ExEx head, and then repeat the synchronization process. - fn synchronize(&mut self) -> eyre::Result<()> { + /// - ExEx is behind the node head (`node_head.number < exex_head.number`). Backfill from the + /// node database. + /// - ExEx is at the same block number as the node head (`node_head.number == + /// exex_head.number`). Nothing to do. + fn check_backfill(&mut self) -> eyre::Result<()> { debug!(target: "exex::manager", "Synchronizing ExEx head"); let backfill_job_factory = BackfillJobFactory::new(self.executor.clone(), self.provider.clone()); match self.exex_head.block.number.cmp(&self.node_head.number) { std::cmp::Ordering::Less => { - // ExEx is behind the node head - - if let Some(exex_header) = self.provider.header(&self.exex_head.block.hash)? { - // ExEx is on the canonical chain - debug!(target: "exex::manager", "ExEx is behind the node head and on the canonical chain"); - - if exex_header.number != self.exex_head.block.number { - eyre::bail!("ExEx head number does not match the hash") - } - - // ExEx is on the canonical chain, start backfill - let backfill = backfill_job_factory - .backfill(self.exex_head.block.number + 1..=self.node_head.number) - .into_stream(); - self.backfill_job = Some(backfill); - } else { - debug!(target: "exex::manager", "ExEx is behind the node head and not on the canonical chain"); - // ExEx is not on the canonical chain, first unwind it and then backfill - - // TODO(alexey): unwind and backfill - self.backfill_job = None; - } + // ExEx is behind the node head, start backfill + debug!(target: "exex::manager", "ExEx is behind the node head and on the canonical chain, starting backfill"); + let backfill = backfill_job_factory + .backfill(self.exex_head.block.number + 1..=self.node_head.number) + .into_stream(); + self.backfill_job = Some(backfill); } - #[allow(clippy::branches_sharing_code)] std::cmp::Ordering::Equal => { - // ExEx is at the same block height as the node head - - if let Some(exex_header) = self.provider.header(&self.exex_head.block.hash)? { - // ExEx is on the canonical chain - debug!(target: "exex::manager", "ExEx is at the same block height as the node head and on the canonical chain"); - - if exex_header.number != self.exex_head.block.number { - eyre::bail!("ExEx head number does not match the hash") - } - - // ExEx is on the canonical chain and the same as the node head, no need to - // backfill - self.backfill_job = None; - } else { - // ExEx is not on the canonical chain, first unwind it and then backfill - debug!(target: "exex::manager", "ExEx is at the same block height as the node head but not on the canonical chain"); - - // TODO(alexey): unwind and backfill - self.backfill_job = None; - } + debug!(target: "exex::manager", "ExEx is at the node head"); } std::cmp::Ordering::Greater => { - debug!(target: "exex::manager", "ExEx is ahead of the node head"); - - // ExEx is ahead of the node head - - // TODO(alexey): wait until the node head is at the same height as the ExEx head - // and then repeat the process above - self.node_head_catchup_in_progress = true; + return Err(eyre::eyre!("ExEx is ahead of the node head")) } }; @@ -270,9 +255,18 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); - if this.pending_sync { - this.synchronize()?; - this.pending_sync = false; + if this.pending_check_canonical { + if let Some(canonical_notification) = this.check_canonical()? { + return Poll::Ready(Some(Ok(canonical_notification))) + } + + // ExEx head is on the canonical chain, we no longer need to check it + this.pending_check_canonical = false; + } + + if this.pending_check_backfill { + this.check_backfill()?; + this.pending_check_backfill = false; } if let Some(backfill_job) = &mut this.backfill_job { @@ -286,71 +280,23 @@ where this.backfill_job = None; } - loop { - let Some(notification) = ready!(this.notifications.poll_recv(cx)) else { - return Poll::Ready(None) - }; + let Some(notification) = ready!(this.notifications.poll_recv(cx)) else { + return Poll::Ready(None) + }; - // 1. Either committed or reverted chain from the notification. - // 2. Block number of the tip of the canonical chain: - // - For committed chain, it's the tip block number. - // - For reverted chain, it's the block number preceding the first block in the chain. - let (chain, tip) = notification - .committed_chain() - .map(|chain| (chain.clone(), chain.tip().number)) - .or_else(|| { - notification - .reverted_chain() - .map(|chain| (chain.clone(), chain.first().number - 1)) - }) - .unzip(); - - if this.node_head_catchup_in_progress { - // If we are waiting for the node head to catch up to the same height as the ExEx - // head, then we need to check if the ExEx is on the canonical chain. - - // Query the chain from the new notification for the ExEx head block number. - let exex_head_block = chain - .as_ref() - .and_then(|chain| chain.blocks().get(&this.exex_head.block.number)); - - // Compare the hash of the block from the new notification to the ExEx head - // hash. - if let Some((block, tip)) = exex_head_block.zip(tip) { - if block.hash() == this.exex_head.block.hash { - // ExEx is on the canonical chain, proceed with the notification - this.node_head_catchup_in_progress = false; - } else { - // ExEx is not on the canonical chain, synchronize - let tip = - this.provider.sealed_header(tip)?.ok_or_eyre("node head not found")?; - this.node_head = Head::new( - tip.number, - tip.hash(), - tip.difficulty, - U256::MAX, - tip.timestamp, - ); - this.synchronize()?; - } - } - } - - if notification - .committed_chain() - .or_else(|| notification.reverted_chain()) - .map_or(false, |chain| chain.first().number > this.exex_head.block.number) - { - return Poll::Ready(Some(Ok(notification))) - } + if let Some(committed_chain) = notification.committed_chain() { + this.exex_head.block = committed_chain.tip().num_hash(); + } else if let Some(reverted_chain) = notification.reverted_chain() { + let first_block = reverted_chain.first(); + this.exex_head.block = (first_block.parent_hash, first_block.number - 1).into(); } + + Poll::Ready(Some(Ok(notification))) } } #[cfg(test)] mod tests { - use std::future::poll_fn; - use crate::Wal; use super::*; @@ -363,7 +309,7 @@ mod tests { use reth_primitives::Block; use reth_provider::{ providers::BlockchainProvider2, test_utils::create_test_provider_factory, BlockWriter, - Chain, + Chain, DatabaseProviderFactory, }; use reth_testing_utils::generators::{self, random_block, BlockParams}; use tokio::sync::mpsc; @@ -451,12 +397,6 @@ mod tests { Ok(()) } - #[ignore] - #[tokio::test] - async fn exex_notifications_behind_head_non_canonical() -> eyre::Result<()> { - Ok(()) - } - #[tokio::test] async fn exex_notifications_same_head_canonical() -> eyre::Result<()> { let temp_dir = tempfile::tempdir().unwrap(); @@ -512,18 +452,112 @@ mod tests { Ok(()) } - #[ignore] #[tokio::test] async fn exex_notifications_same_head_non_canonical() -> eyre::Result<()> { + let mut rng = generators::rng(); + + let temp_dir = tempfile::tempdir().unwrap(); + let mut wal = Wal::new(temp_dir.path()).unwrap(); + + let provider_factory = create_test_provider_factory(); + let genesis_hash = init_genesis(&provider_factory)?; + let genesis_block = provider_factory + .block(genesis_hash.into())? + .ok_or_else(|| eyre::eyre!("genesis block not found"))?; + + let provider = BlockchainProvider2::new(provider_factory)?; + + let node_head_block = random_block( + &mut rng, + genesis_block.number + 1, + BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() }, + ) + .seal_with_senders() + .ok_or_eyre("failed to recover senders")?; + let node_head = Head { + number: node_head_block.number, + hash: node_head_block.hash(), + ..Default::default() + }; + let provider_rw = provider.database_provider_rw()?; + provider_rw.insert_block(node_head_block)?; + provider_rw.commit()?; + let node_head_notification = ExExNotification::ChainCommitted { + new: Arc::new( + BackfillJobFactory::new(EthExecutorProvider::mainnet(), provider.clone()) + .backfill(node_head.number..=node_head.number) + .next() + .ok_or_else(|| eyre::eyre!("failed to backfill"))??, + ), + }; + + let exex_head_block = random_block( + &mut rng, + genesis_block.number + 1, + BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() }, + ); + let exex_head = ExExHead { block: exex_head_block.num_hash() }; + let exex_head_notification = ExExNotification::ChainCommitted { + new: Arc::new(Chain::new( + vec![exex_head_block + .clone() + .seal_with_senders() + .ok_or_eyre("failed to recover senders")?], + Default::default(), + None, + )), + }; + wal.commit(&exex_head_notification)?; + + let new_notification = ExExNotification::ChainCommitted { + new: Arc::new(Chain::new( + vec![random_block( + &mut rng, + node_head.number + 1, + BlockParams { parent: Some(node_head.hash), ..Default::default() }, + ) + .seal_with_senders() + .ok_or_eyre("failed to recover senders")?], + Default::default(), + None, + )), + }; + + let (notifications_tx, notifications_rx) = mpsc::channel(1); + + notifications_tx.send(new_notification.clone()).await?; + + let mut notifications = ExExNotifications::new( + node_head, + provider, + EthExecutorProvider::mainnet(), + notifications_rx, + wal.handle(), + ) + .with_head(exex_head); + + // First notification is the revert of the ExEx head block to get back to the canonical + // chain + assert_eq!( + notifications.next().await.transpose()?, + Some(exex_head_notification.into_inverted()) + ); + // Second notification is the backfilled block from the canonical chain to get back to the + // canonical tip + assert_eq!(notifications.next().await.transpose()?, Some(node_head_notification)); + // Third notification is the actual notification that we sent before + assert_eq!(notifications.next().await.transpose()?, Some(new_notification)); + Ok(()) } #[tokio::test] async fn test_notifications_ahead_of_head() -> eyre::Result<()> { + reth_tracing::init_test_tracing(); let mut rng = generators::rng(); let temp_dir = tempfile::tempdir().unwrap(); - let wal = Wal::new(temp_dir.path()).unwrap(); + let mut wal = Wal::new(temp_dir.path()).unwrap(); let provider_factory = create_test_provider_factory(); let genesis_hash = init_genesis(&provider_factory)?; @@ -538,6 +572,17 @@ mod tests { genesis_block.number + 1, BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() }, ); + let exex_head_notification = ExExNotification::ChainCommitted { + new: Arc::new(Chain::new( + vec![exex_head_block + .clone() + .seal_with_senders() + .ok_or_eyre("failed to recover senders")?], + Default::default(), + None, + )), + }; + wal.commit(&exex_head_notification)?; let node_head = Head { number: genesis_block.number, hash: genesis_hash, ..Default::default() }; @@ -545,20 +590,23 @@ mod tests { block: BlockNumHash { number: exex_head_block.number, hash: exex_head_block.hash() }, }; + let new_notification = ExExNotification::ChainCommitted { + new: Arc::new(Chain::new( + vec![random_block( + &mut rng, + genesis_block.number + 1, + BlockParams { parent: Some(genesis_hash), ..Default::default() }, + ) + .seal_with_senders() + .ok_or_eyre("failed to recover senders")?], + Default::default(), + None, + )), + }; + let (notifications_tx, notifications_rx) = mpsc::channel(1); - notifications_tx - .send(ExExNotification::ChainCommitted { - new: Arc::new(Chain::new( - vec![exex_head_block - .clone() - .seal_with_senders() - .ok_or_eyre("failed to recover senders")?], - Default::default(), - None, - )), - }) - .await?; + notifications_tx.send(new_notification.clone()).await?; let mut notifications = ExExNotifications::new( node_head, @@ -569,29 +617,15 @@ mod tests { ) .with_head(exex_head); - // First notification is skipped because the node is catching up with the ExEx - let new_notification = poll_fn(|cx| Poll::Ready(notifications.poll_next_unpin(cx))).await; - assert!(new_notification.is_pending()); + // First notification is the revert of the ExEx head block to get back to the canonical + // chain + assert_eq!( + notifications.next().await.transpose()?, + Some(exex_head_notification.into_inverted()) + ); - // Imitate the node catching up with the ExEx by sending a notification for the missing - // block - let notification = ExExNotification::ChainCommitted { - new: Arc::new(Chain::new( - vec![random_block( - &mut rng, - exex_head_block.number + 1, - BlockParams { parent: Some(exex_head_block.hash()), ..Default::default() }, - ) - .seal_with_senders() - .ok_or_eyre("failed to recover senders")?], - Default::default(), - None, - )), - }; - notifications_tx.send(notification.clone()).await?; - - // Second notification is received because the node caught up with the ExEx - assert_eq!(notifications.next().await.transpose()?, Some(notification)); + // Second notification is the actual notification that we sent before + assert_eq!(notifications.next().await.transpose()?, Some(new_notification)); Ok(()) } diff --git a/crates/exex/exex/src/wal/cache.rs b/crates/exex/exex/src/wal/cache.rs index b3d29f1b39..cef27369eb 100644 --- a/crates/exex/exex/src/wal/cache.rs +++ b/crates/exex/exex/src/wal/cache.rs @@ -18,17 +18,17 @@ pub struct BlockCache { /// the cache with the same file ID. I.e. for each notification, there may be multiple blocks /// in the cache. files: RwLock>>, - /// A mapping of `Block Hash -> Block`. + /// A mapping of committed blocks `Block Hash -> Block`. /// /// For each [`ExExNotification::ChainCommitted`] notification, there will be an entry per /// block. - blocks: DashMap, + committed_blocks: DashMap, } impl BlockCache { /// Creates a new instance of [`BlockCache`]. pub(super) fn new() -> Self { - Self { files: RwLock::new(BTreeMap::new()), blocks: DashMap::new() } + Self { files: RwLock::new(BTreeMap::new()), committed_blocks: DashMap::new() } } /// Returns `true` if the cache is empty. @@ -95,6 +95,12 @@ impl BlockCache { Some((key, last_block)) } + /// Returns the file ID for the notification containing the given committed block hash, if it + /// exists. + pub(super) fn get_file_id_by_committed_block_hash(&self, block_hash: &B256) -> Option { + self.committed_blocks.get(block_hash).map(|entry| entry.0) + } + /// Inserts the blocks from the notification into the cache with the given file ID. /// /// First, inserts the reverted blocks (if any), then the committed blocks (if any). @@ -126,7 +132,7 @@ impl BlockCache { parent_hash: block.parent_hash, }; files.entry(file_id).or_default().push_back(cached_block); - self.blocks.insert(block.hash(), cached_block); + self.committed_blocks.insert(block.hash(), (file_id, cached_block)); } } } diff --git a/crates/exex/exex/src/wal/mod.rs b/crates/exex/exex/src/wal/mod.rs index 5d28cb3e03..d7aea3aafd 100644 --- a/crates/exex/exex/src/wal/mod.rs +++ b/crates/exex/exex/src/wal/mod.rs @@ -3,12 +3,14 @@ mod cache; pub use cache::BlockCache; mod storage; +use eyre::OptionExt; pub use storage::Storage; use std::{path::Path, sync::Arc}; use alloy_eips::BlockNumHash; use reth_exex_types::ExExNotification; +use reth_primitives::B256; use reth_tracing::tracing::{debug, instrument}; /// WAL is a write-ahead log (WAL) that stores the notifications sent to ExExes. @@ -135,7 +137,10 @@ impl WalInner { block.block.number == to_block.number && block.block.hash == to_block.hash { - let notification = self.storage.read_notification(file_id)?; + let notification = self + .storage + .read_notification(file_id)? + .ok_or_eyre("notification not found")?; if notification.committed_chain().unwrap().blocks().len() == 1 { unfinalized_from_file_id = Some( block_cache.peek().map(|(file_id, _)| *file_id).unwrap_or(u64::MAX), @@ -207,6 +212,21 @@ pub struct WalHandle { wal: Arc, } +impl WalHandle { + /// Returns the notification for the given committed block hash if it exists. + pub fn get_committed_notification_by_block_hash( + &self, + block_hash: &B256, + ) -> eyre::Result> { + let Some(file_id) = self.wal.block_cache.get_file_id_by_committed_block_hash(block_hash) + else { + return Ok(None) + }; + + self.wal.storage.read_notification(file_id) + } +} + #[cfg(test)] mod tests { use std::sync::Arc; diff --git a/crates/exex/exex/src/wal/storage.rs b/crates/exex/exex/src/wal/storage.rs index 8953a6a4ed..817d57d193 100644 --- a/crates/exex/exex/src/wal/storage.rs +++ b/crates/exex/exex/src/wal/storage.rs @@ -84,16 +84,25 @@ impl Storage { &self, range: RangeInclusive, ) -> impl Iterator> + '_ { - range.map(move |id| self.read_notification(id).map(|notification| (id, notification))) + range.map(move |id| { + let notification = self.read_notification(id)?.ok_or_eyre("notification not found")?; + + Ok((id, notification)) + }) } /// Reads the notification from the file with the given id. #[instrument(target = "exex::wal::storage", skip(self))] - pub(super) fn read_notification(&self, file_id: u64) -> eyre::Result { + pub(super) fn read_notification(&self, file_id: u64) -> eyre::Result> { let file_path = self.file_path(file_id); debug!(?file_path, "Reading notification from WAL"); - let mut file = File::open(&file_path)?; + let mut file = match File::open(&file_path) { + Ok(file) => file, + Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None), + Err(err) => return Err(err.into()), + }; + // TODO(alexey): use rmp-serde when Alloy and Reth serde issues are resolved Ok(serde_json::from_reader(&mut file)?) } @@ -149,7 +158,7 @@ mod tests { let file_id = 0; storage.write_notification(file_id, ¬ification)?; let deserialized_notification = storage.read_notification(file_id)?; - assert_eq!(deserialized_notification, notification); + assert_eq!(deserialized_notification, Some(notification)); Ok(()) }