diff --git a/crates/engine/tree/src/database.rs b/crates/engine/tree/src/database.rs deleted file mode 100644 index b7481fb0a9..0000000000 --- a/crates/engine/tree/src/database.rs +++ /dev/null @@ -1,262 +0,0 @@ -#![allow(dead_code)] - -use crate::static_files::{StaticFileAction, StaticFileServiceHandle}; -use reth_chain_state::ExecutedBlock; -use reth_db::database::Database; -use reth_errors::ProviderResult; -use reth_primitives::B256; -use reth_provider::{ - writer::StorageWriter, BlockExecutionWriter, BlockNumReader, BlockWriter, HistoryWriter, - OriginalValuesKnown, ProviderFactory, StageCheckpointWriter, StateWriter, -}; -use reth_prune::{Pruner, PrunerOutput}; -use reth_stages_types::{StageCheckpoint, StageId}; -use std::sync::mpsc::{Receiver, SendError, Sender}; -use tokio::sync::oneshot; -use tracing::debug; - -/// Writes parts of reth's in memory tree state to the database. -/// -/// This is meant to be a spawned service that listens for various incoming database operations, -/// performing those actions on disk, and returning the result in a channel. -/// -/// There are two types of operations this service can perform: -/// - Writing executed blocks to disk, returning the hash of the latest block that was inserted. -/// - Removing blocks from disk, returning the hash of the lowest block removed. -/// -/// This should be spawned in its own thread with [`std::thread::spawn`], since this performs -/// blocking database operations in an endless loop. -#[derive(Debug)] -pub struct DatabaseService { - /// The db / static file provider to use - provider: ProviderFactory, - /// Incoming requests to persist stuff - incoming: Receiver, - /// Handle for the static file service. - static_file_handle: StaticFileServiceHandle, - /// The pruner - pruner: Pruner>, -} - -impl DatabaseService { - /// Create a new database service. - /// - /// NOTE: The [`ProviderFactory`] and [`Pruner`] should be built using an identical - /// [`PruneModes`](reth_prune::PruneModes). - pub const fn new( - provider: ProviderFactory, - incoming: Receiver, - static_file_handle: StaticFileServiceHandle, - pruner: Pruner>, - ) -> Self { - Self { provider, incoming, static_file_handle, pruner } - } - - /// Writes the cloned tree state to the database - fn write(&self, blocks: Vec) -> ProviderResult<()> { - let provider_rw = self.provider.provider_rw()?; - - if blocks.is_empty() { - debug!(target: "tree::persistence::db", "Attempted to write empty block range"); - return Ok(()) - } - - let first_number = blocks.first().unwrap().block().number; - - let last = blocks.last().unwrap().block(); - let last_block_number = last.number; - - // TODO: remove all the clones and do performant / batched writes for each type of object - // instead of a loop over all blocks, - // meaning: - // * blocks - // * state - // * hashed state - // * trie updates (cannot naively extend, need helper) - // * indices (already done basically) - // Insert the blocks - for block in blocks { - let sealed_block = - block.block().clone().try_with_senders_unchecked(block.senders().clone()).unwrap(); - provider_rw.insert_block(sealed_block)?; - - // Write state and changesets to the database. - // Must be written after blocks because of the receipt lookup. - let execution_outcome = block.execution_outcome().clone(); - // TODO: use single storage writer in task when sf / db tasks are combined - execution_outcome.write_to_storage(&provider_rw, None, OriginalValuesKnown::No)?; - - // insert hashes and intermediate merkle nodes - { - let trie_updates = block.trie_updates().clone(); - let hashed_state = block.hashed_state(); - // TODO: use single storage writer in task when sf / db tasks are combined - let storage_writer = StorageWriter::new(Some(&provider_rw), None); - storage_writer.write_hashed_state(&hashed_state.clone().into_sorted())?; - trie_updates.write_to_database(provider_rw.tx_ref())?; - } - - // update history indices - provider_rw.update_history_indices(first_number..=last_block_number)?; - - // Update pipeline progress - provider_rw.update_pipeline_stages(last_block_number, false)?; - } - - debug!(target: "tree::persistence::db", range = ?first_number..=last_block_number, "Appended blocks"); - - Ok(()) - } - - /// Removes block data above the given block number from the database. - /// This is exclusive, i.e., it only removes blocks above `block_number`, and does not remove - /// `block_number`. - /// - /// This will then send a command to the static file service, to remove the actual block data. - fn remove_blocks_above( - &self, - block_number: u64, - sender: oneshot::Sender<()>, - ) -> ProviderResult<()> { - let provider_rw = self.provider.provider_rw()?; - let highest_block = self.provider.last_block_number()?; - provider_rw.remove_block_and_execution_range(block_number..=highest_block)?; - - // send a command to the static file service to also remove blocks - let _ = self - .static_file_handle - .send_action(StaticFileAction::RemoveBlocksAbove((block_number, sender))); - Ok(()) - } - - /// Prunes block data before the given block hash according to the configured prune - /// configuration. - fn prune_before(&mut self, block_num: u64) -> PrunerOutput { - // TODO: doing this properly depends on pruner segment changes - self.pruner.run(block_num).expect("todo: handle errors") - } - - /// Updates checkpoints related to block headers and bodies. This should be called by the static - /// file service, after new transactions have been successfully written to disk. - fn update_transaction_meta(&self, block_num: u64) -> ProviderResult<()> { - let provider_rw = self.provider.provider_rw()?; - provider_rw.save_stage_checkpoint(StageId::Headers, StageCheckpoint::new(block_num))?; - provider_rw.save_stage_checkpoint(StageId::Bodies, StageCheckpoint::new(block_num))?; - provider_rw.commit()?; - Ok(()) - } -} - -impl DatabaseService -where - DB: Database, -{ - /// This is the main loop, that will listen to database events and perform the requested - /// database actions - pub fn run(mut self) { - // If the receiver errors then senders have disconnected, so the loop should then end. - while let Ok(action) = self.incoming.recv() { - match action { - DatabaseAction::RemoveBlocksAbove((new_tip_num, sender)) => { - self.remove_blocks_above(new_tip_num, sender).expect("todo: handle errors"); - } - DatabaseAction::SaveBlocks((blocks, sender)) => { - if blocks.is_empty() { - todo!("return error or something"); - } - let last_block_hash = blocks.last().unwrap().block().hash(); - self.write(blocks).unwrap(); - - // we ignore the error because the caller may or may not care about the result - let _ = sender.send(last_block_hash); - } - DatabaseAction::PruneBefore((block_num, sender)) => { - let res = self.prune_before(block_num); - - // we ignore the error because the caller may or may not care about the result - let _ = sender.send(res); - } - DatabaseAction::UpdateTransactionMeta((block_num, sender)) => { - self.update_transaction_meta(block_num).expect("todo: handle errors"); - - // we ignore the error because the caller may or may not care about the result - let _ = sender.send(()); - } - } - } - } -} - -/// A signal to the database service that part of the tree state can be persisted. -#[derive(Debug)] -pub enum DatabaseAction { - /// The section of tree state that should be persisted. These blocks are expected in order of - /// increasing block number. - /// - /// This should just store the execution history-related data. Header, transaction, and - /// receipt-related data should already be written to static files. - SaveBlocks((Vec, oneshot::Sender)), - - /// Updates checkpoints related to block headers and bodies. This should be called by the - /// static file service, after new transactions have been successfully written to disk. - UpdateTransactionMeta((u64, oneshot::Sender<()>)), - - /// Removes block data above the given block number from the database. - /// - /// This will then send a command to the static file service, to remove the actual block data. - RemoveBlocksAbove((u64, oneshot::Sender<()>)), - - /// Prune associated block data before the given block number, according to already-configured - /// prune modes. - PruneBefore((u64, oneshot::Sender)), -} - -/// A handle to the database service -#[derive(Debug, Clone)] -pub struct DatabaseServiceHandle { - /// The channel used to communicate with the database service - sender: Sender, -} - -impl DatabaseServiceHandle { - /// Create a new [`DatabaseServiceHandle`] from a [`Sender`]. - pub const fn new(sender: Sender) -> Self { - Self { sender } - } - - /// Sends a specific [`DatabaseAction`] in the contained channel. The caller is responsible - /// for creating any channels for the given action. - pub fn send_action(&self, action: DatabaseAction) -> Result<(), SendError> { - self.sender.send(action) - } - - /// Tells the database service to save a certain list of finalized blocks. The blocks are - /// assumed to be ordered by block number. - /// - /// This returns the latest hash that has been saved, allowing removal of that block and any - /// previous blocks from in-memory data structures. - pub async fn save_blocks(&self, blocks: Vec) -> B256 { - let (tx, rx) = oneshot::channel(); - self.sender.send(DatabaseAction::SaveBlocks((blocks, tx))).expect("should be able to send"); - rx.await.expect("todo: err handling") - } - - /// Tells the database service to remove blocks above a certain block number. - pub async fn remove_blocks_above(&self, block_num: u64) { - let (tx, rx) = oneshot::channel(); - self.sender - .send(DatabaseAction::RemoveBlocksAbove((block_num, tx))) - .expect("should be able to send"); - rx.await.expect("todo: err handling") - } - - /// Tells the database service to remove block data before the given hash, according to the - /// configured prune config. - pub async fn prune_before(&self, block_num: u64) -> PrunerOutput { - let (tx, rx) = oneshot::channel(); - self.sender - .send(DatabaseAction::PruneBefore((block_num, tx))) - .expect("should be able to send"); - rx.await.expect("todo: err handling") - } -} diff --git a/crates/engine/tree/src/lib.rs b/crates/engine/tree/src/lib.rs index b4ac74992c..d238bf879c 100644 --- a/crates/engine/tree/src/lib.rs +++ b/crates/engine/tree/src/lib.rs @@ -20,18 +20,14 @@ pub use reth_blockchain_tree_api::*; pub mod backfill; /// The type that drives the chain forward. pub mod chain; -/// The background writer service for batch db writes. -pub mod database; /// Support for downloading blocks on demand for live sync. pub mod download; /// Engine Api chain handler support. pub mod engine; /// Metrics support. pub mod metrics; -/// The background writer service, coordinating the static file and database services. +/// The background writer service, coordinating write operations on static files and the database. pub mod persistence; -/// The background writer service for static file writes. -pub mod static_files; /// Support for interacting with the blockchain tree. pub mod tree; diff --git a/crates/engine/tree/src/persistence.rs b/crates/engine/tree/src/persistence.rs index 394c1f92e7..f6765386e4 100644 --- a/crates/engine/tree/src/persistence.rs +++ b/crates/engine/tree/src/persistence.rs @@ -1,35 +1,328 @@ #![allow(dead_code)] -use crate::{ - database::{DatabaseAction, DatabaseService, DatabaseServiceHandle}, - static_files::{StaticFileAction, StaticFileService, StaticFileServiceHandle}, -}; use reth_chain_state::ExecutedBlock; use reth_db::Database; -use reth_primitives::{SealedBlock, B256, U256}; -use reth_provider::ProviderFactory; +use reth_errors::ProviderResult; +use reth_primitives::{SealedBlock, StaticFileSegment, TransactionSignedNoHash, B256, U256}; +use reth_provider::{ + writer::StorageWriter, BlockExecutionWriter, BlockNumReader, BlockWriter, HistoryWriter, + OriginalValuesKnown, ProviderFactory, StageCheckpointWriter, StateWriter, + StaticFileProviderFactory, StaticFileWriter, TransactionsProviderExt, +}; use reth_prune::{Pruner, PrunerOutput}; +use reth_stages_types::{StageCheckpoint, StageId}; use std::sync::{ - mpsc::{SendError, Sender}, + mpsc::{Receiver, SendError, Sender}, Arc, }; use tokio::sync::oneshot; +use tracing::debug; -/// A signal to the database and static file services that part of the tree state can be persisted. +/// Writes parts of reth's in memory tree state to the database and static files. +/// +/// This is meant to be a spawned service that listens for various incoming persistence operations, +/// performing those actions on disk, and returning the result in a channel. +/// +/// This should be spawned in its own thread with [`std::thread::spawn`], since this performs +/// blocking I/O operations in an endless loop. +#[derive(Debug)] +pub struct PersistenceService { + /// The provider factory to use + provider: ProviderFactory, + /// Incoming requests + incoming: Receiver, + /// The pruner + pruner: Pruner>, +} + +impl PersistenceService { + /// Create a new persistence service + pub const fn new( + provider: ProviderFactory, + incoming: Receiver, + pruner: Pruner>, + ) -> Self { + Self { provider, incoming, pruner } + } + + /// Writes the cloned tree state to database + fn write(&self, blocks: &[ExecutedBlock]) -> ProviderResult<()> { + debug!(target: "tree::persistence", "Writing blocks to database"); + let provider_rw = self.provider.provider_rw()?; + + if blocks.is_empty() { + debug!(target: "tree::persistence", "Attempted to write empty block range"); + return Ok(()) + } + + let first_number = blocks.first().unwrap().block().number; + + let last = blocks.last().unwrap().block(); + let last_block_number = last.number; + + // TODO: remove all the clones and do performant / batched writes for each type of object + // instead of a loop over all blocks, + // meaning: + // * blocks + // * state + // * hashed state + // * trie updates (cannot naively extend, need helper) + // * indices (already done basically) + // Insert the blocks + for block in blocks { + let sealed_block = + block.block().clone().try_with_senders_unchecked(block.senders().clone()).unwrap(); + provider_rw.insert_block(sealed_block)?; + + // Write state and changesets to the database. + // Must be written after blocks because of the receipt lookup. + let execution_outcome = block.execution_outcome().clone(); + // TODO: do we provide a static file producer here? + execution_outcome.write_to_storage(&provider_rw, None, OriginalValuesKnown::No)?; + + // insert hashes and intermediate merkle nodes + { + let trie_updates = block.trie_updates().clone(); + let hashed_state = block.hashed_state(); + // TODO: use single storage writer in task when sf / db tasks are combined + let storage_writer = StorageWriter::new(Some(&provider_rw), None); + storage_writer.write_hashed_state(&hashed_state.clone().into_sorted())?; + trie_updates.write_to_database(provider_rw.tx_ref())?; + } + + // update history indices + provider_rw.update_history_indices(first_number..=last_block_number)?; + + // Update pipeline progress + provider_rw.update_pipeline_stages(last_block_number, false)?; + } + + debug!(target: "tree::persistence", range = ?first_number..=last_block_number, "Appended block data"); + + Ok(()) + } + + /// Removes block data above the given block number from the database. + /// This is exclusive, i.e., it only removes blocks above `block_number`, and does not remove + /// `block_number`. + /// + /// This will then send a command to the static file service, to remove the actual block data. + fn remove_blocks_above(&self, block_number: u64) -> ProviderResult<()> { + debug!(target: "tree::persistence", ?block_number, "Removing blocks from database above block_number"); + let provider_rw = self.provider.provider_rw()?; + let highest_block = self.provider.last_block_number()?; + provider_rw.remove_block_and_execution_range(block_number..=highest_block)?; + + Ok(()) + } + + /// Prunes block data before the given block hash according to the configured prune + /// configuration. + fn prune_before(&mut self, block_num: u64) -> PrunerOutput { + debug!(target: "tree::persistence", ?block_num, "Running pruner"); + // TODO: doing this properly depends on pruner segment changes + self.pruner.run(block_num).expect("todo: handle errors") + } + + /// Updates checkpoints related to block headers and bodies. This should be called after new + /// transactions have been successfully written to disk. + fn update_transaction_meta(&self, block_num: u64) -> ProviderResult<()> { + debug!(target: "tree::persistence", ?block_num, "Updating transaction metadata after writing"); + let provider_rw = self.provider.provider_rw()?; + provider_rw.save_stage_checkpoint(StageId::Headers, StageCheckpoint::new(block_num))?; + provider_rw.save_stage_checkpoint(StageId::Bodies, StageCheckpoint::new(block_num))?; + provider_rw.commit()?; + Ok(()) + } + + // TODO: perform read for td and start + /// Writes the transactions to static files, to act as a log. + /// + /// The [`update_transaction_meta`](Self::update_transaction_meta) method should be called + /// after this, to update the checkpoints for headers and block bodies. + fn log_transactions( + &self, + block: Arc, + start_tx_number: u64, + td: U256, + ) -> ProviderResult { + debug!(target: "tree::persistence", ?td, ?start_tx_number, "Logging transactions"); + let provider = self.provider.static_file_provider(); + let mut header_writer = provider.get_writer(block.number, StaticFileSegment::Headers)?; + let mut transactions_writer = + provider.get_writer(block.number, StaticFileSegment::Transactions)?; + + header_writer.append_header(block.header(), td, &block.hash())?; + let no_hash_transactions = + block.body.clone().into_iter().map(TransactionSignedNoHash::from); + + let mut tx_number = start_tx_number; + for tx in no_hash_transactions { + transactions_writer.append_transaction(tx_number, &tx)?; + tx_number += 1; + } + + // increment block for transactions + transactions_writer.increment_block(StaticFileSegment::Transactions, block.number)?; + + // finally commit + transactions_writer.commit()?; + header_writer.commit()?; + + Ok(block.number) + } + + /// Write execution-related block data to static files. + /// + /// This will then send a command to the db service, that it should write new data, and update + /// the checkpoints for execution and beyond. + fn write_execution_data(&self, blocks: &[ExecutedBlock]) -> ProviderResult<()> { + if blocks.is_empty() { + return Ok(()) + } + let provider_rw = self.provider.provider_rw()?; + let provider = self.provider.static_file_provider(); + + // NOTE: checked non-empty above + let first_block = blocks.first().unwrap().block(); + let last_block = blocks.last().unwrap().block().clone(); + + // use the storage writer + let current_block = first_block.number; + debug!(target: "tree::persistence", len=blocks.len(), ?current_block, "Writing execution data to static files"); + + let receipts_writer = + provider.get_writer(first_block.number, StaticFileSegment::Receipts)?; + + let storage_writer = StorageWriter::new(Some(&provider_rw), Some(receipts_writer)); + let receipts_iter = blocks.iter().map(|block| { + let receipts = block.execution_outcome().receipts().receipt_vec.clone(); + debug_assert!(receipts.len() == 1); + receipts.first().unwrap().clone() + }); + storage_writer.append_receipts_from_blocks(current_block, receipts_iter)?; + + Ok(()) + } + + /// Removes the blocks above the given block number from static files. Also removes related + /// receipt and header data. + /// + /// This is exclusive, i.e., it only removes blocks above `block_number`, and does not remove + /// `block_number`. + /// + /// Returns the block hash for the lowest block removed from the database, which should be + /// the hash for `block_number + 1`. + /// + /// This is meant to be called by the db service, as this should only be done after related data + /// is removed from the database, and checkpoints are updated. + /// + /// Returns the hash of the lowest removed block. + fn remove_static_file_blocks_above(&self, block_number: u64) -> ProviderResult<()> { + debug!(target: "tree::persistence", ?block_number, "Removing static file blocks above block_number"); + let sf_provider = self.provider.static_file_provider(); + let db_provider_rw = self.provider.provider_rw()?; + + // get highest static file block for the total block range + let highest_static_file_block = sf_provider + .get_highest_static_file_block(StaticFileSegment::Headers) + .expect("todo: error handling, headers should exist"); + + // Get the total txs for the block range, so we have the correct number of columns for + // receipts and transactions + let tx_range = db_provider_rw + .transaction_range_by_block_range(block_number..=highest_static_file_block)?; + let total_txs = tx_range.end().saturating_sub(*tx_range.start()); + + // get the writers + let mut header_writer = sf_provider.get_writer(block_number, StaticFileSegment::Headers)?; + let mut transactions_writer = + sf_provider.get_writer(block_number, StaticFileSegment::Transactions)?; + let mut receipts_writer = + sf_provider.get_writer(block_number, StaticFileSegment::Receipts)?; + + // finally actually truncate, these internally commit + receipts_writer.prune_receipts(total_txs, block_number)?; + transactions_writer.prune_transactions(total_txs, block_number)?; + header_writer.prune_headers(highest_static_file_block.saturating_sub(block_number))?; + + sf_provider.commit()?; + + Ok(()) + } +} + +impl PersistenceService +where + DB: Database, +{ + /// This is the main loop, that will listen to database events and perform the requested + /// database actions + pub fn run(mut self) { + // If the receiver errors then senders have disconnected, so the loop should then end. + while let Ok(action) = self.incoming.recv() { + match action { + PersistenceAction::RemoveBlocksAbove((new_tip_num, sender)) => { + self.remove_blocks_above(new_tip_num).expect("todo: handle errors"); + self.remove_static_file_blocks_above(new_tip_num).expect("todo: handle errors"); + + // we ignore the error because the caller may or may not care about the result + let _ = sender.send(()); + } + PersistenceAction::SaveBlocks((blocks, sender)) => { + if blocks.is_empty() { + todo!("return error or something"); + } + let last_block_hash = blocks.last().unwrap().block().hash(); + // first write to static files + self.write_execution_data(&blocks).expect("todo: handle errors"); + // then write to db + self.write(&blocks).expect("todo: handle errors"); + + // we ignore the error because the caller may or may not care about the result + let _ = sender.send(last_block_hash); + } + PersistenceAction::PruneBefore((block_num, sender)) => { + let res = self.prune_before(block_num); + + // we ignore the error because the caller may or may not care about the result + let _ = sender.send(res); + } + PersistenceAction::LogTransactions((block, start_tx_number, td, sender)) => { + let block_num = self + .log_transactions(block, start_tx_number, td) + .expect("todo: handle errors"); + self.update_transaction_meta(block_num).expect("todo: handle errors"); + + // we ignore the error because the caller may or may not care about the result + let _ = sender.send(()); + } + } + } + } +} + +/// A signal to the persistence service that part of the tree state can be persisted. #[derive(Debug)] pub enum PersistenceAction { - /// The given block has been added to the canonical chain, its transactions and headers will be - /// persisted for durability. - LogTransactions((Arc, u64, U256, oneshot::Sender<()>)), - /// The section of tree state that should be persisted. These blocks are expected in order of /// increasing block number. /// - /// This should just store the execution history-related data. Header, transaction, and - /// receipt-related data should already be written to static files. + /// First, header, transaction, and receipt-related data should be written to static files. + /// Then the execution history-related data will be written to the database. SaveBlocks((Vec, oneshot::Sender)), + /// The given block has been added to the canonical chain, its transactions and headers will be + /// persisted for durability. + /// + /// This will first append the header and transactions to static files, then update the + /// checkpoints for headers and block bodies in the database. + LogTransactions((Arc, u64, U256, oneshot::Sender<()>)), + /// Removes block data above the given block number from the database. + /// + /// This will first update checkpoints from the database, then remove actual block data from + /// static files. RemoveBlocksAbove((u64, oneshot::Sender<()>)), /// Prune associated block data before the given block number, according to already-configured @@ -37,111 +330,47 @@ pub enum PersistenceAction { PruneBefore((u64, oneshot::Sender)), } -/// An error type for when there is a [`SendError`] while sending an action to one of the services. -#[derive(Debug)] -pub enum PersistenceSendError { - /// When there is an error sending to the static file service - StaticFile(SendError), - /// When there is an error sending to the database service - Database(SendError), -} - -impl From> for PersistenceSendError { - fn from(value: SendError) -> Self { - Self::StaticFile(value) - } -} - -impl From> for PersistenceSendError { - fn from(value: SendError) -> Self { - Self::Database(value) - } -} - -/// A handle to the database and static file services. This will send commands to the correct -/// service, depending on the command. -/// -/// Some commands should be sent to the database service, and others should be sent to the static -/// file service, despite having the same name. This is because some actions require work to be done -/// by both the static file _and_ the database service, and require some coordination. -/// -/// This type is what actually coordinates the two services, and should be used by consumers of the -/// persistence related services. +/// A handle to the persistence service #[derive(Debug, Clone)] pub struct PersistenceHandle { - /// The channel used to communicate with the database service - db_sender: Sender, - /// The channel used to communicate with the static file service - static_file_sender: Sender, + /// The channel used to communicate with the persistence service + sender: Sender, } impl PersistenceHandle { /// Create a new [`PersistenceHandle`] from a [`Sender`]. - pub const fn new( - db_sender: Sender, - static_file_sender: Sender, - ) -> Self { - Self { db_sender, static_file_sender } + pub const fn new(sender: Sender) -> Self { + Self { sender } } - /// Create a new [`PersistenceHandle`], and spawn the database and static file services. + /// Create a new [`PersistenceHandle`], and spawn the persistence service. pub fn spawn_services( provider_factory: ProviderFactory, pruner: Pruner>, ) -> Self { // create the initial channels - let (static_file_service_tx, static_file_service_rx) = std::sync::mpsc::channel(); let (db_service_tx, db_service_rx) = std::sync::mpsc::channel(); // construct persistence handle - let persistence_handle = Self::new(db_service_tx.clone(), static_file_service_tx.clone()); + let persistence_handle = Self::new(db_service_tx); - // construct handles for the services to talk to each other - let static_file_handle = StaticFileServiceHandle::new(static_file_service_tx); - let database_handle = DatabaseServiceHandle::new(db_service_tx); - - // spawn the db service - let db_service = DatabaseService::new( - provider_factory.clone(), - db_service_rx, - static_file_handle, - pruner, - ); + // spawn the persistence service + let db_service = PersistenceService::new(provider_factory, db_service_rx, pruner); std::thread::Builder::new() - .name("Database Service".to_string()) + .name("Persistence Service".to_string()) .spawn(|| db_service.run()) .unwrap(); - // spawn the static file service - let static_file_service = - StaticFileService::new(provider_factory, static_file_service_rx, database_handle); - std::thread::Builder::new() - .name("Static File Service".to_string()) - .spawn(|| static_file_service.run()) - .unwrap(); - persistence_handle } /// Sends a specific [`PersistenceAction`] in the contained channel. The caller is responsible /// for creating any channels for the given action. - pub fn send_action(&self, action: PersistenceAction) -> Result<(), PersistenceSendError> { - match action { - PersistenceAction::LogTransactions(input) => self - .static_file_sender - .send(StaticFileAction::LogTransactions(input)) - .map_err(From::from), - PersistenceAction::SaveBlocks(input) => self - .static_file_sender - .send(StaticFileAction::WriteExecutionData(input)) - .map_err(From::from), - PersistenceAction::RemoveBlocksAbove(input) => { - self.db_sender.send(DatabaseAction::RemoveBlocksAbove(input)).map_err(From::from) - } - PersistenceAction::PruneBefore(input) => { - self.db_sender.send(DatabaseAction::PruneBefore(input)).map_err(From::from) - } - } + pub fn send_action( + &self, + action: PersistenceAction, + ) -> Result<(), SendError> { + self.sender.send(action) } /// Tells the persistence service to save a certain list of finalized blocks. The blocks are @@ -207,6 +436,7 @@ mod tests { #[tokio::test] async fn test_save_blocks_empty() { + reth_tracing::init_test_tracing(); let persistence_handle = default_persistence_handle(); let blocks = vec![]; @@ -220,6 +450,7 @@ mod tests { #[tokio::test] async fn test_save_blocks_single_block() { + reth_tracing::init_test_tracing(); let persistence_handle = default_persistence_handle(); let block_number = 0; let executed = get_executed_block_with_number(block_number); @@ -236,6 +467,7 @@ mod tests { #[tokio::test] async fn test_save_blocks_multiple_blocks() { + reth_tracing::init_test_tracing(); let persistence_handle = default_persistence_handle(); let blocks = get_executed_blocks(0..5).collect::>(); @@ -250,6 +482,7 @@ mod tests { #[tokio::test] async fn test_save_blocks_multiple_calls() { + reth_tracing::init_test_tracing(); let persistence_handle = default_persistence_handle(); let ranges = [0..1, 1..2, 2..4, 4..5]; diff --git a/crates/engine/tree/src/static_files.rs b/crates/engine/tree/src/static_files.rs deleted file mode 100644 index 2be3382278..0000000000 --- a/crates/engine/tree/src/static_files.rs +++ /dev/null @@ -1,269 +0,0 @@ -#![allow(dead_code)] - -use crate::database::{DatabaseAction, DatabaseServiceHandle}; -use reth_chain_state::ExecutedBlock; -use reth_db::database::Database; -use reth_errors::ProviderResult; -use reth_primitives::{SealedBlock, StaticFileSegment, TransactionSignedNoHash, B256, U256}; -use reth_provider::{ - ProviderFactory, StaticFileProviderFactory, StaticFileWriter, TransactionsProviderExt, -}; -use std::sync::{ - mpsc::{Receiver, SendError, Sender}, - Arc, -}; -use tokio::sync::oneshot; - -/// Writes finalized blocks to reth's static files. -/// -/// This is meant to be a spawned service that listens for various incoming finalization operations, -/// and writing to or producing new static files. -/// -/// This should be spawned in its own thread with [`std::thread::spawn`], since this performs -/// blocking file operations in an endless loop. -#[derive(Debug)] -pub struct StaticFileService { - /// The db / static file provider to use - provider: ProviderFactory, - /// Handle for the database service - database_handle: DatabaseServiceHandle, - /// Incoming requests to write static files - incoming: Receiver, -} - -impl StaticFileService -where - DB: Database + 'static, -{ - /// Create a new static file service. - pub const fn new( - provider: ProviderFactory, - incoming: Receiver, - database_handle: DatabaseServiceHandle, - ) -> Self { - Self { provider, database_handle, incoming } - } - - // TODO: some things about this are a bit weird, and just to make the underlying static file - // writes work - tx number, total difficulty inclusion. They require either additional in memory - // data or a db lookup. Maybe we can use a db read here - /// Writes the transactions to static files, to act as a log. - /// - /// This will then send a command to the db service, that it should update the checkpoints for - /// headers and block bodies. - fn log_transactions( - &self, - block: Arc, - start_tx_number: u64, - td: U256, - sender: oneshot::Sender<()>, - ) -> ProviderResult<()> { - let provider = self.provider.static_file_provider(); - let mut header_writer = provider.get_writer(block.number, StaticFileSegment::Headers)?; - let mut transactions_writer = - provider.get_writer(block.number, StaticFileSegment::Transactions)?; - - // TODO: does to_compact require ownership? - header_writer.append_header(block.header(), td, &block.hash())?; - let no_hash_transactions = - block.body.clone().into_iter().map(TransactionSignedNoHash::from); - - let mut tx_number = start_tx_number; - for tx in no_hash_transactions { - transactions_writer.append_transaction(tx_number, &tx)?; - tx_number += 1; - } - - // increment block for transactions - transactions_writer.increment_block(StaticFileSegment::Transactions, block.number)?; - - // finally commit - transactions_writer.commit()?; - header_writer.commit()?; - - // TODO: do we care about the mpsc error here? - // send a command to the db service to update the checkpoints for headers / bodies - let _ = self - .database_handle - .send_action(DatabaseAction::UpdateTransactionMeta((block.number, sender))); - - Ok(()) - } - - /// Write execution-related block data to static files. - /// - /// This will then send a command to the db service, that it should write new data, and update - /// the checkpoints for execution and beyond. - fn write_execution_data( - &self, - blocks: Vec, - sender: oneshot::Sender, - ) -> ProviderResult<()> { - if blocks.is_empty() { - return Ok(()) - } - let provider = self.provider.static_file_provider(); - - // NOTE: checked non-empty above - let first_block = blocks.first().unwrap().block(); - let last_block = blocks.last().unwrap().block(); - - // get highest receipt, if it returns none, use zero (this is the first static file write) - let mut current_receipt = provider - .get_highest_static_file_tx(StaticFileSegment::Receipts) - .map(|num| num + 1) - .unwrap_or_default(); - let mut current_block = first_block.number; - - let mut receipts_writer = - provider.get_writer(first_block.number, StaticFileSegment::Receipts)?; - for receipts in blocks.iter().map(|block| block.execution_outcome().receipts.clone()) { - debug_assert!(receipts.len() == 1); - // TODO: should we also assert that the receipt is not None here, that means the - // receipt is pruned - for maybe_receipt in receipts.first().unwrap() { - if let Some(receipt) = maybe_receipt { - receipts_writer.append_receipt(current_receipt, receipt)?; - } - current_receipt += 1; - } - - // increment the block - receipts_writer.increment_block(StaticFileSegment::Receipts, current_block)?; - current_block += 1; - } - - // finally increment block and commit - receipts_writer.commit()?; - - // TODO: do we care about the mpsc error here? - // send a command to the db service to update the checkpoints for execution etc. - let _ = self.database_handle.send_action(DatabaseAction::SaveBlocks((blocks, sender))); - - Ok(()) - } - - /// Removes the blocks above the given block number from static files. Also removes related - /// receipt and header data. - /// - /// This is exclusive, i.e., it only removes blocks above `block_number`, and does not remove - /// `block_number`. - /// - /// Returns the block hash for the lowest block removed from the database, which should be - /// the hash for `block_number + 1`. - /// - /// This is meant to be called by the db service, as this should only be done after related data - /// is removed from the database, and checkpoints are updated. - /// - /// Returns the hash of the lowest removed block. - fn remove_blocks_above( - &self, - block_num: u64, - sender: oneshot::Sender<()>, - ) -> ProviderResult<()> { - let sf_provider = self.provider.static_file_provider(); - let db_provider_rw = self.provider.provider_rw()?; - - // get highest static file block for the total block range - let highest_static_file_block = sf_provider - .get_highest_static_file_block(StaticFileSegment::Headers) - .expect("todo: error handling, headers should exist"); - - // Get the total txs for the block range, so we have the correct number of columns for - // receipts and transactions - let tx_range = db_provider_rw - .transaction_range_by_block_range(block_num..=highest_static_file_block)?; - let total_txs = tx_range.end().saturating_sub(*tx_range.start()); - - // get the writers - let mut header_writer = sf_provider.get_writer(block_num, StaticFileSegment::Headers)?; - let mut transactions_writer = - sf_provider.get_writer(block_num, StaticFileSegment::Transactions)?; - let mut receipts_writer = sf_provider.get_writer(block_num, StaticFileSegment::Receipts)?; - - // finally actually truncate, these internally commit - receipts_writer.prune_receipts(total_txs, block_num)?; - transactions_writer.prune_transactions(total_txs, block_num)?; - header_writer.prune_headers(highest_static_file_block.saturating_sub(block_num))?; - - sf_provider.commit()?; - - Ok(()) - } -} - -impl StaticFileService -where - DB: Database + 'static, -{ - /// This is the main loop, that will listen to static file actions, and write DB data to static - /// files. - pub fn run(self) { - // If the receiver errors then senders have disconnected, so the loop should then end. - while let Ok(action) = self.incoming.recv() { - match action { - StaticFileAction::LogTransactions(( - block, - start_tx_number, - td, - response_sender, - )) => { - self.log_transactions(block, start_tx_number, td, response_sender) - .expect("todo: handle errors"); - } - StaticFileAction::RemoveBlocksAbove((block_num, response_sender)) => { - self.remove_blocks_above(block_num, response_sender) - .expect("todo: handle errors"); - } - StaticFileAction::WriteExecutionData((blocks, response_sender)) => { - self.write_execution_data(blocks, response_sender) - .expect("todo: handle errors"); - } - } - } - } -} - -/// A signal to the static file service that some data should be copied from the DB to static files. -#[derive(Debug)] -pub enum StaticFileAction { - /// The given block has been added to the canonical chain, its transactions and headers will be - /// persisted for durability. - /// - /// This will then send a command to the db service, that it should update the checkpoints for - /// headers and block bodies. - LogTransactions((Arc, u64, U256, oneshot::Sender<()>)), - - /// Write execution-related block data to static files. - /// - /// This will then send a command to the db service, that it should write new data, and update - /// the checkpoints for execution and beyond. - WriteExecutionData((Vec, oneshot::Sender)), - - /// Removes the blocks above the given block number from static files. Also removes related - /// receipt and header data. - /// - /// This is meant to be called by the db service, as this should only be done after related - /// data is removed from the database, and checkpoints are updated. - RemoveBlocksAbove((u64, oneshot::Sender<()>)), -} - -/// A handle to the static file service -#[derive(Debug, Clone)] -pub struct StaticFileServiceHandle { - /// The channel used to communicate with the static file service - sender: Sender, -} - -impl StaticFileServiceHandle { - /// Create a new [`StaticFileServiceHandle`] from a [`Sender`]. - pub const fn new(sender: Sender) -> Self { - Self { sender } - } - - /// Sends a specific [`StaticFileAction`] in the contained channel. The caller is responsible - /// for creating any channels for the given action. - pub fn send_action(&self, action: StaticFileAction) -> Result<(), SendError> { - self.sender.send(action) - } -} diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index 3ad79d1b1b..9d28ed966d 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -1357,7 +1357,7 @@ impl PersistenceState { #[cfg(test)] mod tests { use super::*; - use crate::static_files::StaticFileAction; + use crate::persistence::PersistenceAction; use reth_beacon_consensus::EthBeaconConsensus; use reth_chain_state::{test_utils::get_executed_blocks, BlockState}; use reth_chainspec::{ChainSpecBuilder, MAINNET}; @@ -1372,7 +1372,7 @@ mod tests { tree: EngineApiTreeHandlerImpl, to_tree_tx: Sender>>, blocks: Vec, - sf_action_rx: Receiver, + action_rx: Receiver, payload_command_rx: UnboundedReceiver>, } @@ -1395,8 +1395,7 @@ mod tests { let tree_state = TreeState { blocks_by_hash, blocks_by_number, ..Default::default() }; let (action_tx, action_rx) = channel(); - let (sf_action_tx, sf_action_rx) = channel(); - let persistence_handle = PersistenceHandle::new(action_tx, sf_action_tx); + let persistence_handle = PersistenceHandle::new(action_tx); let chain_spec = Arc::new( ChainSpecBuilder::default() @@ -1444,22 +1443,22 @@ mod tests { tree.canonical_in_memory_state = CanonicalInMemoryState::new(state_by_hash, hash_by_number, pending); - TestHarness { tree, to_tree_tx, blocks, sf_action_rx, payload_command_rx } + TestHarness { tree, to_tree_tx, blocks, action_rx, payload_command_rx } } #[tokio::test] async fn test_tree_persist_blocks() { // we need more than PERSISTENCE_THRESHOLD blocks to trigger the // persistence task. - let TestHarness { tree, to_tree_tx, sf_action_rx, mut blocks, payload_command_rx } = + let TestHarness { tree, to_tree_tx, action_rx, mut blocks, payload_command_rx } = get_default_test_harness(PERSISTENCE_THRESHOLD + 1); std::thread::Builder::new().name("Tree Task".to_string()).spawn(|| tree.run()).unwrap(); // send a message to the tree to enter the main loop. to_tree_tx.send(FromEngine::DownloadedBlocks(vec![])).unwrap(); - let received_action = sf_action_rx.recv().expect("Failed to receive saved blocks"); - if let StaticFileAction::WriteExecutionData((saved_blocks, _)) = received_action { + let received_action = action_rx.recv().expect("Failed to receive saved blocks"); + if let PersistenceAction::SaveBlocks((saved_blocks, _)) = received_action { // only PERSISTENCE_THRESHOLD will be persisted blocks.pop(); assert_eq!(saved_blocks.len() as u64, PERSISTENCE_THRESHOLD); @@ -1471,7 +1470,7 @@ mod tests { #[tokio::test] async fn test_in_memory_state_trait_impl() { - let TestHarness { tree, to_tree_tx, sf_action_rx, blocks, payload_command_rx } = + let TestHarness { tree, to_tree_tx, action_rx, blocks, payload_command_rx } = get_default_test_harness(10); let head_block = blocks.last().unwrap().block(); @@ -1494,7 +1493,7 @@ mod tests { #[tokio::test] async fn test_engine_request_during_backfill() { - let TestHarness { mut tree, to_tree_tx, sf_action_rx, blocks, payload_command_rx } = + let TestHarness { mut tree, to_tree_tx, action_rx, blocks, payload_command_rx } = get_default_test_harness(PERSISTENCE_THRESHOLD); // set backfill active diff --git a/crates/storage/provider/src/writer/mod.rs b/crates/storage/provider/src/writer/mod.rs index 0223cbd074..2fd0931044 100644 --- a/crates/storage/provider/src/writer/mod.rs +++ b/crates/storage/provider/src/writer/mod.rs @@ -171,14 +171,28 @@ impl<'a, 'b, DB: Database> StorageWriter<'a, 'b, DB> { StorageType::StaticFile(self.static_file_writer()) }; + let mut last_tx_idx = None; for (idx, receipts) in blocks.enumerate() { let block_number = initial_block_number + idx as u64; - let first_tx_index = bodies_cursor - .seek_exact(block_number)? - .map(|(_, indices)| indices.first_tx_num()) + let mut first_tx_index = + bodies_cursor.seek_exact(block_number)?.map(|(_, indices)| indices.first_tx_num()); + + // If there are no indices, that means there have been no transactions + // + // So instead of returning an error, use zero + if block_number == initial_block_number && first_tx_index.is_none() { + first_tx_index = Some(0); + } + + // TODO: I guess this error will never be returned + let first_tx_index = first_tx_index + .or(last_tx_idx) .ok_or_else(|| ProviderError::BlockBodyIndicesNotFound(block_number))?; + // update for empty blocks + last_tx_idx = Some(first_tx_index); + match &mut storage_type { StorageType::Database(cursor) => { DatabaseWriter(cursor).append_block_receipts(