From 90cb3629a5fdde03c1b6f937cc02bf45e1ef9e59 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Thu, 10 Oct 2024 14:13:14 +0100 Subject: [PATCH] feat(exex): commit only notifications with unfinalized blocks to WAL (#11638) --- crates/exex/exex/src/manager.rs | 124 ++++++++++++++----- crates/node/builder/src/launch/exex.rs | 6 +- crates/stages/stages/src/stages/execution.rs | 15 ++- 3 files changed, 108 insertions(+), 37 deletions(-) diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index 31dc822222..8c1518f309 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -34,6 +34,18 @@ use tokio_util::sync::{PollSendError, PollSender, ReusableBoxFuture}; /// or 17 minutes of 1-second blocks. pub const DEFAULT_EXEX_MANAGER_CAPACITY: usize = 1024; +/// The source of the notification. +/// +/// This distinguishment is needed to not commit any pipeline notificatations to [WAL](`Wal`), +/// because they are already finalized. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ExExNotificationSource { + /// The notification was sent from the pipeline. + Pipeline, + /// The notification was sent from the blockchain tree. + BlockchainTree, +} + /// Metrics for an `ExEx`. #[derive(Metrics)] #[metrics(scope = "exex")] @@ -197,7 +209,7 @@ pub struct ExExManager

{ exex_handles: Vec, /// [`ExExNotification`] channel from the [`ExExManagerHandle`]s. - handle_rx: UnboundedReceiver, + handle_rx: UnboundedReceiver<(ExExNotificationSource, ExExNotification)>, /// The minimum notification ID currently present in the buffer. min_id: usize, @@ -429,14 +441,23 @@ where // Drain handle notifications while this.buffer.len() < this.max_capacity { - if let Poll::Ready(Some(notification)) = this.handle_rx.poll_recv(cx) { - debug!( - target: "exex::manager", - committed_tip = ?notification.committed_chain().map(|chain| chain.tip().number), - reverted_tip = ?notification.reverted_chain().map(|chain| chain.tip().number), - "Received new notification" - ); - this.wal.commit(¬ification)?; + if let Poll::Ready(Some((source, notification))) = this.handle_rx.poll_recv(cx) { + let committed_tip = notification.committed_chain().map(|chain| chain.tip().number); + let reverted_tip = notification.reverted_chain().map(|chain| chain.tip().number); + debug!(target: "exex::manager", ?committed_tip, ?reverted_tip, "Received new notification"); + + // Commit to WAL only notifications from blockchain tree. Pipeline notifications + // always contain only finalized blocks. + match source { + ExExNotificationSource::BlockchainTree => { + debug!(target: "exex::manager", ?committed_tip, ?reverted_tip, "Committing notification to WAL"); + this.wal.commit(¬ification)?; + } + ExExNotificationSource::Pipeline => { + debug!(target: "exex::manager", ?committed_tip, ?reverted_tip, "Notification was sent from pipeline, skipping WAL commit"); + } + } + this.push_notification(notification); continue } @@ -491,7 +512,7 @@ where #[derive(Debug)] pub struct ExExManagerHandle { /// Channel to send notifications to the `ExEx` manager. - exex_tx: UnboundedSender, + exex_tx: UnboundedSender<(ExExNotificationSource, ExExNotification)>, /// The number of `ExEx`'s running on the node. num_exexs: usize, /// A watch channel denoting whether the manager is ready for new notifications or not. @@ -533,8 +554,12 @@ impl ExExManagerHandle { /// Synchronously send a notification over the channel to all execution extensions. /// /// Senders should call [`Self::has_capacity`] first. - pub fn send(&self, notification: ExExNotification) -> Result<(), SendError> { - self.exex_tx.send(notification) + pub fn send( + &self, + source: ExExNotificationSource, + notification: ExExNotification, + ) -> Result<(), SendError<(ExExNotificationSource, ExExNotification)>> { + self.exex_tx.send((source, notification)) } /// Asynchronously send a notification over the channel to all execution extensions. @@ -543,10 +568,11 @@ impl ExExManagerHandle { /// capacity in the channel, the future will wait. pub async fn send_async( &mut self, + source: ExExNotificationSource, notification: ExExNotification, - ) -> Result<(), SendError> { + ) -> Result<(), SendError<(ExExNotificationSource, ExExNotification)>> { self.ready().await; - self.exex_tx.send(notification) + self.exex_tx.send((source, notification)) } /// Get the current capacity of the `ExEx` manager's internal notification buffer. @@ -610,16 +636,16 @@ impl Clone for ExExManagerHandle { mod tests { use super::*; use alloy_primitives::B256; - use futures::StreamExt; + use futures::{StreamExt, TryStreamExt}; use rand::Rng; use reth_db_common::init::init_genesis; use reth_evm_ethereum::execute::EthExecutorProvider; use reth_primitives::SealedBlockWithSenders; use reth_provider::{ providers::BlockchainProvider2, test_utils::create_test_provider_factory, BlockReader, - Chain, TransactionVariant, + BlockWriter, Chain, DatabaseProviderFactory, TransactionVariant, }; - use reth_testing_utils::generators; + use reth_testing_utils::generators::{self, random_block, BlockParams}; fn empty_finalized_header_stream() -> ForkChoiceStream { let (tx, rx) = watch::channel(None); @@ -959,9 +985,21 @@ mod tests { }; // Send notifications to go over the max capacity - exex_manager.handle.exex_tx.send(notification.clone()).unwrap(); - exex_manager.handle.exex_tx.send(notification.clone()).unwrap(); - exex_manager.handle.exex_tx.send(notification).unwrap(); + exex_manager + .handle + .exex_tx + .send((ExExNotificationSource::BlockchainTree, notification.clone())) + .unwrap(); + exex_manager + .handle + .exex_tx + .send((ExExNotificationSource::BlockchainTree, notification.clone())) + .unwrap(); + exex_manager + .handle + .exex_tx + .send((ExExNotificationSource::BlockchainTree, notification)) + .unwrap(); // Pin the ExExManager to call the poll method let mut pinned_manager = std::pin::pin!(exex_manager); @@ -1177,6 +1215,18 @@ mod tests { .sealed_block_with_senders(genesis_hash.into(), TransactionVariant::NoHash) .unwrap() .ok_or_else(|| eyre::eyre!("genesis block not found"))?; + + let block = random_block( + &mut rng, + genesis_block.number + 1, + BlockParams { parent: Some(genesis_hash), ..Default::default() }, + ) + .seal_with_senders() + .unwrap(); + let provider_rw = provider_factory.database_provider_rw().unwrap(); + provider_rw.insert_block(block.clone()).unwrap(); + provider_rw.commit().unwrap(); + let provider = BlockchainProvider2::new(provider_factory).unwrap(); let temp_dir = tempfile::tempdir().unwrap(); @@ -1190,33 +1240,49 @@ mod tests { wal.handle(), ); - let notification = ExExNotification::ChainCommitted { + let genesis_notification = ExExNotification::ChainCommitted { new: Arc::new(Chain::new(vec![genesis_block.clone()], Default::default(), None)), }; + let notification = ExExNotification::ChainCommitted { + new: Arc::new(Chain::new(vec![block.clone()], Default::default(), None)), + }; let (finalized_headers_tx, rx) = watch::channel(None); + finalized_headers_tx.send(Some(genesis_block.header.clone()))?; let finalized_header_stream = ForkChoiceStream::new(rx); let mut exex_manager = std::pin::pin!(ExExManager::new( provider, vec![exex_handle], - 1, + 2, wal, finalized_header_stream )); let mut cx = Context::from_waker(futures::task::noop_waker_ref()); - exex_manager.handle().send(notification.clone())?; + exex_manager + .handle() + .send(ExExNotificationSource::Pipeline, genesis_notification.clone())?; + exex_manager.handle().send(ExExNotificationSource::BlockchainTree, notification.clone())?; assert!(exex_manager.as_mut().poll(&mut cx)?.is_pending()); - assert_eq!(notifications.next().await.unwrap().unwrap(), notification.clone()); + assert_eq!( + notifications.try_poll_next_unpin(&mut cx)?, + Poll::Ready(Some(genesis_notification)) + ); + assert!(exex_manager.as_mut().poll(&mut cx)?.is_pending()); + assert_eq!( + notifications.try_poll_next_unpin(&mut cx)?, + Poll::Ready(Some(notification.clone())) + ); + // WAL shouldn't contain the genesis notification, because it's finalized assert_eq!( exex_manager.wal.iter_notifications()?.collect::>>()?, [notification.clone()] ); - finalized_headers_tx.send(Some(genesis_block.header.clone()))?; + finalized_headers_tx.send(Some(block.header.clone()))?; assert!(exex_manager.as_mut().poll(&mut cx).is_pending()); // WAL isn't finalized because the ExEx didn't emit the `FinishedHeight` event assert_eq!( @@ -1229,7 +1295,7 @@ mod tests { .send(ExExEvent::FinishedHeight((rng.gen::(), rng.gen::()).into())) .unwrap(); - finalized_headers_tx.send(Some(genesis_block.header.clone()))?; + finalized_headers_tx.send(Some(block.header.clone()))?; assert!(exex_manager.as_mut().poll(&mut cx).is_pending()); // WAL isn't finalized because the ExEx emitted a `FinishedHeight` event with a // non-canonical block @@ -1239,12 +1305,12 @@ mod tests { ); // Send a `FinishedHeight` event with a canonical block - events_tx.send(ExExEvent::FinishedHeight(genesis_block.num_hash())).unwrap(); + events_tx.send(ExExEvent::FinishedHeight(block.num_hash())).unwrap(); - finalized_headers_tx.send(Some(genesis_block.header.clone()))?; + finalized_headers_tx.send(Some(block.header.clone()))?; assert!(exex_manager.as_mut().poll(&mut cx).is_pending()); // WAL is finalized - assert!(exex_manager.wal.iter_notifications()?.next().is_none()); + assert_eq!(exex_manager.wal.iter_notifications()?.next().transpose()?, None); Ok(()) } diff --git a/crates/node/builder/src/launch/exex.rs b/crates/node/builder/src/launch/exex.rs index 5441f393d3..a3640690c1 100644 --- a/crates/node/builder/src/launch/exex.rs +++ b/crates/node/builder/src/launch/exex.rs @@ -6,7 +6,8 @@ use futures::future; use reth_chain_state::ForkChoiceSubscriptions; use reth_chainspec::EthChainSpec; use reth_exex::{ - ExExContext, ExExHandle, ExExManager, ExExManagerHandle, Wal, DEFAULT_EXEX_MANAGER_CAPACITY, + ExExContext, ExExHandle, ExExManager, ExExManagerHandle, ExExNotificationSource, Wal, + DEFAULT_EXEX_MANAGER_CAPACITY, }; use reth_node_api::{FullNodeComponents, NodeTypes}; use reth_primitives::Head; @@ -47,6 +48,7 @@ impl ExExLauncher { return Ok(None) } + info!(target: "reth::cli", "Loading ExEx Write-Ahead Log..."); let exex_wal = Wal::new( config_container .config @@ -127,7 +129,7 @@ impl ExExLauncher { async move { while let Ok(notification) = canon_state_notifications.recv().await { handle - .send_async(notification.into()) + .send_async(ExExNotificationSource::BlockchainTree, notification.into()) .await .expect("blockchain tree notification could not be sent to exex manager"); } diff --git a/crates/stages/stages/src/stages/execution.rs b/crates/stages/stages/src/stages/execution.rs index df234e542b..7bb6ebc59e 100644 --- a/crates/stages/stages/src/stages/execution.rs +++ b/crates/stages/stages/src/stages/execution.rs @@ -9,7 +9,7 @@ use reth_evm::{ metrics::ExecutorMetrics, }; use reth_execution_types::{Chain, ExecutionOutcome}; -use reth_exex::{ExExManagerHandle, ExExNotification}; +use reth_exex::{ExExManagerHandle, ExExNotification, ExExNotificationSource}; use reth_primitives::{Header, SealedHeader, StaticFileSegment}; use reth_primitives_traits::format_gas_throughput; use reth_provider::{ @@ -389,9 +389,10 @@ where // NOTE: We can ignore the error here, since an error means that the channel is closed, // which means the manager has died, which then in turn means the node is shutting down. - let _ = self - .exex_manager_handle - .send(ExExNotification::ChainCommitted { new: Arc::new(chain) }); + let _ = self.exex_manager_handle.send( + ExExNotificationSource::Pipeline, + ExExNotification::ChainCommitted { new: Arc::new(chain) }, + ); Ok(()) } @@ -477,8 +478,10 @@ where // NOTE: We can ignore the error here, since an error means that the channel is closed, // which means the manager has died, which then in turn means the node is shutting down. - let _ = - self.exex_manager_handle.send(ExExNotification::ChainReverted { old: Arc::new(chain) }); + let _ = self.exex_manager_handle.send( + ExExNotificationSource::Pipeline, + ExExNotification::ChainReverted { old: Arc::new(chain) }, + ); Ok(()) }