use crate::metrics::PersistenceMetrics; use alloy_consensus::BlockHeader; use alloy_eips::BlockNumHash; use reth_chain_state::ExecutedBlockWithTrieUpdates; use reth_errors::ProviderError; use reth_ethereum_primitives::EthPrimitives; use reth_primitives_traits::NodePrimitives; use reth_provider::{ providers::ProviderNodeTypes, writer::UnifiedStorageWriter, BlockHashReader, ChainStateBlockWriter, DatabaseProviderFactory, ProviderFactory, StaticFileProviderFactory, }; use reth_prune::{PrunerError, PrunerOutput, PrunerWithFactory}; use reth_stages_api::{MetricEvent, MetricEventsSender}; use std::{ sync::mpsc::{Receiver, SendError, Sender}, time::Instant, }; use thiserror::Error; use tokio::sync::oneshot; use tracing::{debug, error}; /// Writes parts of reth's in memory tree state to the database and static files. /// /// This is meant to be a spawned service that listens for various incoming persistence operations, /// performing those actions on disk, and returning the result in a channel. /// /// This should be spawned in its own thread with [`std::thread::spawn`], since this performs /// blocking I/O operations in an endless loop. #[derive(Debug)] pub struct PersistenceService where N: ProviderNodeTypes, { /// The provider factory to use provider: ProviderFactory, /// Incoming requests incoming: Receiver>, /// The pruner pruner: PrunerWithFactory>, /// metrics metrics: PersistenceMetrics, /// Sender for sync metrics - we only submit sync metrics for persisted blocks sync_metrics_tx: MetricEventsSender, } impl PersistenceService where N: ProviderNodeTypes, { /// Create a new persistence service pub fn new( provider: ProviderFactory, incoming: Receiver>, pruner: PrunerWithFactory>, sync_metrics_tx: MetricEventsSender, ) -> Self { Self { provider, incoming, pruner, metrics: PersistenceMetrics::default(), sync_metrics_tx } } /// Prunes block data before the given block hash according to the configured prune /// configuration. fn prune_before(&mut self, block_num: u64) -> Result { debug!(target: "engine::persistence", ?block_num, "Running pruner"); let start_time = Instant::now(); // TODO: doing this properly depends on pruner segment changes let result = self.pruner.run(block_num); self.metrics.prune_before_duration_seconds.record(start_time.elapsed()); result } } impl PersistenceService where N: ProviderNodeTypes, { /// This is the main loop, that will listen to database events and perform the requested /// database actions pub fn run(mut self) -> Result<(), PersistenceError> { // If the receiver errors then senders have disconnected, so the loop should then end. while let Ok(action) = self.incoming.recv() { match action { PersistenceAction::RemoveBlocksAbove(new_tip_num, sender) => { let result = self.on_remove_blocks_above(new_tip_num)?; // send new sync metrics based on removed blocks let _ = self.sync_metrics_tx.send(MetricEvent::SyncHeight { height: new_tip_num }); // we ignore the error because the caller may or may not care about the result let _ = sender.send(result); } PersistenceAction::SaveBlocks(blocks, sender) => { let result = self.on_save_blocks(blocks)?; let result_number = result.map(|r| r.number); // we ignore the error because the caller may or may not care about the result let _ = sender.send(result); if let Some(block_number) = result_number { // send new sync metrics based on saved blocks let _ = self .sync_metrics_tx .send(MetricEvent::SyncHeight { height: block_number }); if self.pruner.is_pruning_needed(block_number) { // We log `PrunerOutput` inside the `Pruner` let _ = self.prune_before(block_number)?; } } } PersistenceAction::SaveFinalizedBlock(finalized_block) => { let provider = self.provider.database_provider_rw()?; provider.save_finalized_block_number(finalized_block)?; provider.commit()?; } PersistenceAction::SaveSafeBlock(safe_block) => { let provider = self.provider.database_provider_rw()?; provider.save_safe_block_number(safe_block)?; provider.commit()?; } } } Ok(()) } fn on_remove_blocks_above( &self, new_tip_num: u64, ) -> Result, PersistenceError> { debug!(target: "engine::persistence", ?new_tip_num, "Removing blocks"); let start_time = Instant::now(); let provider_rw = self.provider.database_provider_rw()?; let sf_provider = self.provider.static_file_provider(); let new_tip_hash = provider_rw.block_hash(new_tip_num)?; UnifiedStorageWriter::from(&provider_rw, &sf_provider).remove_blocks_above(new_tip_num)?; UnifiedStorageWriter::commit_unwind(provider_rw)?; debug!(target: "engine::persistence", ?new_tip_num, ?new_tip_hash, "Removed blocks from disk"); self.metrics.remove_blocks_above_duration_seconds.record(start_time.elapsed()); Ok(new_tip_hash.map(|hash| BlockNumHash { hash, number: new_tip_num })) } fn on_save_blocks( &self, blocks: Vec>, ) -> Result, PersistenceError> { debug!(target: "engine::persistence", first=?blocks.first().map(|b| b.recovered_block.num_hash()), last=?blocks.last().map(|b| b.recovered_block.num_hash()), "Saving range of blocks"); let start_time = Instant::now(); let last_block_hash_num = blocks.last().map(|block| BlockNumHash { hash: block.recovered_block().hash(), number: block.recovered_block().header().number(), }); if last_block_hash_num.is_some() { let provider_rw = self.provider.database_provider_rw()?; let static_file_provider = self.provider.static_file_provider(); UnifiedStorageWriter::from(&provider_rw, &static_file_provider).save_blocks(blocks)?; UnifiedStorageWriter::commit(provider_rw)?; } self.metrics.save_blocks_duration_seconds.record(start_time.elapsed()); Ok(last_block_hash_num) } } /// One of the errors that can happen when using the persistence service. #[derive(Debug, Error)] pub enum PersistenceError { /// A pruner error #[error(transparent)] PrunerError(#[from] PrunerError), /// A provider error #[error(transparent)] ProviderError(#[from] ProviderError), } /// A signal to the persistence service that part of the tree state can be persisted. #[derive(Debug)] pub enum PersistenceAction { /// The section of tree state that should be persisted. These blocks are expected in order of /// increasing block number. /// /// First, header, transaction, and receipt-related data should be written to static files. /// Then the execution history-related data will be written to the database. SaveBlocks(Vec>, oneshot::Sender>), /// Removes block data above the given block number from the database. /// /// This will first update checkpoints from the database, then remove actual block data from /// static files. RemoveBlocksAbove(u64, oneshot::Sender>), /// Update the persisted finalized block on disk SaveFinalizedBlock(u64), /// Update the persisted safe block on disk SaveSafeBlock(u64), } /// A handle to the persistence service #[derive(Debug, Clone)] pub struct PersistenceHandle { /// The channel used to communicate with the persistence service sender: Sender>, } impl PersistenceHandle { /// Create a new [`PersistenceHandle`] from a [`Sender`]. pub const fn new(sender: Sender>) -> Self { Self { sender } } /// Create a new [`PersistenceHandle`], and spawn the persistence service. pub fn spawn_service( provider_factory: ProviderFactory, pruner: PrunerWithFactory>, sync_metrics_tx: MetricEventsSender, ) -> PersistenceHandle where N: ProviderNodeTypes, { // create the initial channels let (db_service_tx, db_service_rx) = std::sync::mpsc::channel(); // construct persistence handle let persistence_handle = PersistenceHandle::new(db_service_tx); // spawn the persistence service let db_service = PersistenceService::new(provider_factory, db_service_rx, pruner, sync_metrics_tx); std::thread::Builder::new() .name("Persistence Service".to_string()) .spawn(|| { if let Err(err) = db_service.run() { error!(target: "engine::persistence", ?err, "Persistence service failed"); } }) .unwrap(); persistence_handle } /// Sends a specific [`PersistenceAction`] in the contained channel. The caller is responsible /// for creating any channels for the given action. pub fn send_action( &self, action: PersistenceAction, ) -> Result<(), SendError>> { self.sender.send(action) } /// Tells the persistence service to save a certain list of finalized blocks. The blocks are /// assumed to be ordered by block number. /// /// This returns the latest hash that has been saved, allowing removal of that block and any /// previous blocks from in-memory data structures. This value is returned in the receiver end /// of the sender argument. /// /// If there are no blocks to persist, then `None` is sent in the sender. pub fn save_blocks( &self, blocks: Vec>, tx: oneshot::Sender>, ) -> Result<(), SendError>> { self.send_action(PersistenceAction::SaveBlocks(blocks, tx)) } /// Persists the finalized block number on disk. pub fn save_finalized_block_number( &self, finalized_block: u64, ) -> Result<(), SendError>> { self.send_action(PersistenceAction::SaveFinalizedBlock(finalized_block)) } /// Persists the finalized block number on disk. pub fn save_safe_block_number( &self, safe_block: u64, ) -> Result<(), SendError>> { self.send_action(PersistenceAction::SaveSafeBlock(safe_block)) } /// Tells the persistence service to remove blocks above a certain block number. The removed /// blocks are returned by the service. /// /// When the operation completes, the new tip hash is returned in the receiver end of the sender /// argument. pub fn remove_blocks_above( &self, block_num: u64, tx: oneshot::Sender>, ) -> Result<(), SendError>> { self.send_action(PersistenceAction::RemoveBlocksAbove(block_num, tx)) } } #[cfg(test)] mod tests { use super::*; use alloy_primitives::B256; use reth_chain_state::test_utils::TestBlockBuilder; use reth_exex_types::FinishedExExHeight; use reth_provider::test_utils::create_test_provider_factory; use reth_prune::Pruner; use tokio::sync::mpsc::unbounded_channel; fn default_persistence_handle() -> PersistenceHandle { let provider = create_test_provider_factory(); let (_finished_exex_height_tx, finished_exex_height_rx) = tokio::sync::watch::channel(FinishedExExHeight::NoExExs); let pruner = Pruner::new_with_factory(provider.clone(), vec![], 5, 0, None, finished_exex_height_rx); let (sync_metrics_tx, _sync_metrics_rx) = unbounded_channel(); PersistenceHandle::::spawn_service(provider, pruner, sync_metrics_tx) } #[tokio::test] async fn test_save_blocks_empty() { reth_tracing::init_test_tracing(); let persistence_handle = default_persistence_handle(); let blocks = vec![]; let (tx, rx) = oneshot::channel(); persistence_handle.save_blocks(blocks, tx).unwrap(); let hash = rx.await.unwrap(); assert_eq!(hash, None); } #[tokio::test] async fn test_save_blocks_single_block() { reth_tracing::init_test_tracing(); let persistence_handle = default_persistence_handle(); let block_number = 0; let mut test_block_builder = TestBlockBuilder::eth(); let executed = test_block_builder.get_executed_block_with_number(block_number, B256::random()); let block_hash = executed.recovered_block().hash(); let blocks = vec![executed]; let (tx, rx) = oneshot::channel(); persistence_handle.save_blocks(blocks, tx).unwrap(); let BlockNumHash { hash: actual_hash, number: _ } = tokio::time::timeout(std::time::Duration::from_secs(10), rx) .await .expect("test timed out") .expect("channel closed unexpectedly") .expect("no hash returned"); assert_eq!(block_hash, actual_hash); } #[tokio::test] async fn test_save_blocks_multiple_blocks() { reth_tracing::init_test_tracing(); let persistence_handle = default_persistence_handle(); let mut test_block_builder = TestBlockBuilder::eth(); let blocks = test_block_builder.get_executed_blocks(0..5).collect::>(); let last_hash = blocks.last().unwrap().recovered_block().hash(); let (tx, rx) = oneshot::channel(); persistence_handle.save_blocks(blocks, tx).unwrap(); let BlockNumHash { hash: actual_hash, number: _ } = rx.await.unwrap().unwrap(); assert_eq!(last_hash, actual_hash); } #[tokio::test] async fn test_save_blocks_multiple_calls() { reth_tracing::init_test_tracing(); let persistence_handle = default_persistence_handle(); let ranges = [0..1, 1..2, 2..4, 4..5]; let mut test_block_builder = TestBlockBuilder::eth(); for range in ranges { let blocks = test_block_builder.get_executed_blocks(range).collect::>(); let last_hash = blocks.last().unwrap().recovered_block().hash(); let (tx, rx) = oneshot::channel(); persistence_handle.save_blocks(blocks, tx).unwrap(); let BlockNumHash { hash: actual_hash, number: _ } = rx.await.unwrap().unwrap(); assert_eq!(last_hash, actual_hash); } } }