diff --git a/crates/engine/tree/src/persistence.rs b/crates/engine/tree/src/persistence.rs index 9678f24d45..99cbab79b9 100644 --- a/crates/engine/tree/src/persistence.rs +++ b/crates/engine/tree/src/persistence.rs @@ -5,9 +5,9 @@ use reth_db::{models::CompactU256, tables, transaction::DbTxMut, Database}; use reth_errors::ProviderResult; use reth_primitives::{SealedBlock, StaticFileSegment, TransactionSignedNoHash, B256, U256}; use reth_provider::{ - writer::StorageWriter, BlockExecutionWriter, BlockNumReader, BlockWriter, HistoryWriter, - OriginalValuesKnown, ProviderFactory, StageCheckpointWriter, StateChangeWriter, StateWriter, - StaticFileProviderFactory, StaticFileWriter, TransactionsProviderExt, TrieWriter, + writer::StorageWriter, BlockExecutionWriter, BlockNumReader, BlockWriter, DatabaseProviderRW, + HistoryWriter, OriginalValuesKnown, ProviderFactory, StageCheckpointWriter, StateChangeWriter, + StateWriter, StaticFileProviderFactory, StaticFileWriter, TransactionsProviderExt, TrieWriter, }; use reth_prune::{Pruner, PrunerOutput}; use reth_stages_types::{StageCheckpoint, StageId}; @@ -46,14 +46,17 @@ impl PersistenceService { } /// Writes the cloned tree state to database - fn write(&self, blocks: &[ExecutedBlock]) -> ProviderResult<()> { + fn write( + &self, + blocks: &[ExecutedBlock], + provider_rw: &DatabaseProviderRW, + ) -> ProviderResult<()> { if blocks.is_empty() { debug!(target: "tree::persistence", "Attempted to write empty block range"); return Ok(()) } debug!(target: "tree::persistence", block_count = %blocks.len(), "Writing blocks to database"); - let provider_rw = self.provider.provider_rw()?; let first_number = blocks.first().unwrap().block().number; let last = blocks.last().unwrap().block(); @@ -77,7 +80,7 @@ impl PersistenceService { // Must be written after blocks because of the receipt lookup. let execution_outcome = block.execution_outcome().clone(); // TODO: do we provide a static file producer here? - let mut storage_writer = StorageWriter::new(Some(&provider_rw), None); + let mut storage_writer = StorageWriter::new(Some(provider_rw), None); storage_writer.write_to_storage(execution_outcome, OriginalValuesKnown::No)?; // insert hashes and intermediate merkle nodes @@ -95,8 +98,6 @@ impl PersistenceService { // Update pipeline progress provider_rw.update_pipeline_stages(last_block_number, false)?; - provider_rw.commit()?; - debug!(target: "tree::persistence", range = ?first_number..=last_block_number, "Appended block data"); Ok(()) @@ -127,15 +128,18 @@ impl PersistenceService { /// Updates checkpoints related to block headers and bodies. This should be called after new /// transactions have been successfully written to disk. - fn update_transaction_meta(&self, block_num: u64, td: U256) -> ProviderResult<()> { + fn update_transaction_meta( + &self, + block_num: u64, + td: U256, + provider_rw: &DatabaseProviderRW, + ) -> ProviderResult<()> { debug!(target: "tree::persistence", ?block_num, "Updating transaction metadata after writing"); - let provider_rw = self.provider.provider_rw()?; provider_rw .tx_ref() .put::(block_num, CompactU256(td))?; provider_rw.save_stage_checkpoint(StageId::Headers, StageCheckpoint::new(block_num))?; provider_rw.save_stage_checkpoint(StageId::Bodies, StageCheckpoint::new(block_num))?; - provider_rw.commit()?; Ok(()) } @@ -145,15 +149,18 @@ impl PersistenceService { /// /// The [`update_transaction_meta`](Self::update_transaction_meta) method should be called /// after this, to update the checkpoints for headers and block bodies. - #[instrument(level = "trace", skip(self), target = "engine")] - fn write_transactions(&self, block: Arc) -> ProviderResult<(u64, U256)> { + #[instrument(level = "trace", skip_all, fields(block = ?block.num_hash()) target = "engine")] + fn write_transactions( + &self, + block: Arc, + provider_rw: &DatabaseProviderRW, + ) -> ProviderResult<(u64, U256)> { debug!(target: "tree::persistence", "Writing transactions"); let provider = self.provider.static_file_provider(); let new_td = { let header_writer = provider.get_writer(block.number, StaticFileSegment::Headers)?; - let provider_ro = self.provider.provider()?; - let mut storage_writer = StorageWriter::new(Some(&provider_ro), Some(header_writer)); + let mut storage_writer = StorageWriter::new(Some(provider_rw), Some(header_writer)); let new_td = storage_writer.append_headers_from_blocks( block.header().number, std::iter::once(&(block.header(), block.hash())), @@ -162,7 +169,7 @@ impl PersistenceService { let transactions_writer = provider.get_writer(block.number, StaticFileSegment::Transactions)?; let mut storage_writer = - StorageWriter::new(Some(&provider_ro), Some(transactions_writer)); + StorageWriter::new(Some(provider_rw), Some(transactions_writer)); let no_hash_transactions = block.body.clone().into_iter().map(TransactionSignedNoHash::from).collect(); storage_writer.append_transactions_from_blocks( @@ -173,20 +180,18 @@ impl PersistenceService { new_td }; - provider.commit()?; - Ok((block.number, new_td)) } - /// Write execution-related block data to static files. - /// - /// This will then send a command to the db service, that it should write new data, and update - /// the checkpoints for execution and beyond. - fn write_execution_data(&self, blocks: &[ExecutedBlock]) -> ProviderResult<()> { + /// Write execution-related block data to database and/or static files. + fn write_execution_data( + &self, + blocks: &[ExecutedBlock], + provider_rw: &DatabaseProviderRW, + ) -> ProviderResult<()> { if blocks.is_empty() { return Ok(()) } - let provider_rw = self.provider.provider_rw()?; let provider = self.provider.static_file_provider(); // NOTE: checked non-empty above @@ -201,7 +206,7 @@ impl PersistenceService { provider.get_writer(first_block.number, StaticFileSegment::Receipts)?; { - let mut storage_writer = StorageWriter::new(Some(&provider_rw), Some(receipts_writer)); + let mut storage_writer = StorageWriter::new(Some(provider_rw), Some(receipts_writer)); let receipts_iter = blocks.iter().map(|block| { let receipts = block.execution_outcome().receipts().receipt_vec.clone(); debug_assert!(receipts.len() == 1); @@ -210,9 +215,6 @@ impl PersistenceService { storage_writer.append_receipts_from_blocks(current_block, receipts_iter)?; } - provider.commit()?; - provider_rw.commit()?; - Ok(()) } @@ -285,18 +287,23 @@ where todo!("return error or something"); } let last_block_hash = blocks.last().unwrap().block().hash(); - // first write to static files - self.write_execution_data(&blocks).expect("todo: handle errors"); - // then write to db - self.write(&blocks).expect("todo: handle errors"); + + let provider_rw = self.provider.provider_rw().expect("todo: handle errors"); + self.write_execution_data(&blocks, &provider_rw).expect("todo: handle errors"); + self.write(&blocks, &provider_rw).expect("todo: handle errors"); + for block in &blocks { // first write transactions let (block_num, td) = self - .write_transactions(block.block.clone()) + .write_transactions(block.block.clone(), &provider_rw) + .expect("todo: handle errors"); + self.update_transaction_meta(block_num, td, &provider_rw) .expect("todo: handle errors"); - self.update_transaction_meta(block_num, td).expect("todo: handle errors"); } + self.provider.static_file_provider().commit().expect("todo: handle errors"); + provider_rw.commit().expect("todo: handle errors"); + // we ignore the error because the caller may or may not care about the result let _ = sender.send(last_block_hash); } @@ -307,12 +314,13 @@ where let _ = sender.send(res); } PersistenceAction::WriteTransactions((block, sender)) => { - let (block_num, td) = - self.write_transactions(block).expect("todo: handle errors"); - self.update_transaction_meta(block_num, td).expect("todo: handle errors"); + unimplemented!() + // let (block_num, td) = + // self.write_transactions(block).expect("todo: handle errors"); + // self.update_transaction_meta(block_num, td).expect("todo: handle errors"); - // we ignore the error because the caller may or may not care about the result - let _ = sender.send(()); + // // we ignore the error because the caller may or may not care about the + // result let _ = sender.send(()); } } }