From f3ce077c8aeb4bc98f0cdc2541ce8a365891e077 Mon Sep 17 00:00:00 2001 From: Dan Cline <6798349+Rjected@users.noreply.github.com> Date: Tue, 30 Jul 2024 18:17:20 -0400 Subject: [PATCH] fix: write transactions on save_blocks call (#9912) --- crates/engine/tree/src/persistence.rs | 38 ++++++++++++++++------- crates/storage/provider/src/writer/mod.rs | 10 +++--- 2 files changed, 32 insertions(+), 16 deletions(-) diff --git a/crates/engine/tree/src/persistence.rs b/crates/engine/tree/src/persistence.rs index a93963dd06..23be3e7d2b 100644 --- a/crates/engine/tree/src/persistence.rs +++ b/crates/engine/tree/src/persistence.rs @@ -1,9 +1,9 @@ #![allow(dead_code)] use reth_chain_state::ExecutedBlock; -use reth_db::Database; +use reth_db::{models::CompactU256, tables, transaction::DbTxMut, Database}; use reth_errors::ProviderResult; -use reth_primitives::{SealedBlock, StaticFileSegment, TransactionSignedNoHash, B256}; +use reth_primitives::{SealedBlock, StaticFileSegment, TransactionSignedNoHash, B256, U256}; use reth_provider::{ writer::StorageWriter, BlockExecutionWriter, BlockNumReader, BlockWriter, HistoryWriter, OriginalValuesKnown, ProviderFactory, StageCheckpointWriter, StateWriter, @@ -16,7 +16,7 @@ use std::sync::{ Arc, }; use tokio::sync::oneshot; -use tracing::debug; +use tracing::{debug, instrument}; /// Writes parts of reth's in memory tree state to the database and static files. /// @@ -127,9 +127,12 @@ impl PersistenceService { /// Updates checkpoints related to block headers and bodies. This should be called after new /// transactions have been successfully written to disk. - fn update_transaction_meta(&self, block_num: u64) -> ProviderResult<()> { + fn update_transaction_meta(&self, block_num: u64, td: U256) -> ProviderResult<()> { debug!(target: "tree::persistence", ?block_num, "Updating transaction metadata after writing"); let provider_rw = self.provider.provider_rw()?; + provider_rw + .tx_ref() + .put::(block_num, CompactU256(td))?; provider_rw.save_stage_checkpoint(StageId::Headers, StageCheckpoint::new(block_num))?; provider_rw.save_stage_checkpoint(StageId::Bodies, StageCheckpoint::new(block_num))?; provider_rw.commit()?; @@ -138,17 +141,20 @@ impl PersistenceService { /// Writes the transactions to static files. /// + /// Returns the block number and new total difficulty. + /// /// The [`update_transaction_meta`](Self::update_transaction_meta) method should be called /// after this, to update the checkpoints for headers and block bodies. - fn write_transactions(&self, block: Arc) -> ProviderResult { + #[instrument(level = "trace", skip(self), target = "engine")] + fn write_transactions(&self, block: Arc) -> ProviderResult<(u64, U256)> { debug!(target: "tree::persistence", "Writing transactions"); let provider = self.provider.static_file_provider(); - { + let new_td = { let header_writer = provider.get_writer(block.number, StaticFileSegment::Headers)?; let provider_ro = self.provider.provider()?; let mut storage_writer = StorageWriter::new(Some(&provider_ro), Some(header_writer)); - storage_writer.append_headers_from_blocks( + let new_td = storage_writer.append_headers_from_blocks( block.header().number, std::iter::once(&(block.header(), block.hash())), )?; @@ -163,11 +169,13 @@ impl PersistenceService { block.header().number, std::iter::once(&no_hash_transactions), )?; - } + + new_td + }; provider.commit()?; - Ok(block.number) + Ok((block.number, new_td)) } /// Write execution-related block data to static files. @@ -281,6 +289,13 @@ where self.write_execution_data(&blocks).expect("todo: handle errors"); // then write to db self.write(&blocks).expect("todo: handle errors"); + for block in &blocks { + // first write transactions + let (block_num, td) = self + .write_transactions(block.block.clone()) + .expect("todo: handle errors"); + self.update_transaction_meta(block_num, td).expect("todo: handle errors"); + } // we ignore the error because the caller may or may not care about the result let _ = sender.send(last_block_hash); @@ -292,8 +307,9 @@ where let _ = sender.send(res); } PersistenceAction::WriteTransactions((block, sender)) => { - let block_num = self.write_transactions(block).expect("todo: handle errors"); - self.update_transaction_meta(block_num).expect("todo: handle errors"); + let (block_num, td) = + self.write_transactions(block).expect("todo: handle errors"); + self.update_transaction_meta(block_num, td).expect("todo: handle errors"); // we ignore the error because the caller may or may not care about the result let _ = sender.send(()); diff --git a/crates/storage/provider/src/writer/mod.rs b/crates/storage/provider/src/writer/mod.rs index f9989184a5..faaf960037 100644 --- a/crates/storage/provider/src/writer/mod.rs +++ b/crates/storage/provider/src/writer/mod.rs @@ -133,7 +133,7 @@ where &mut self, initial_block_number: BlockNumber, headers: impl Iterator, - ) -> ProviderResult<()> + ) -> ProviderResult where I: Borrow<(H, B256)>, H: Borrow
, @@ -143,7 +143,7 @@ where let mut td_cursor = self.database_writer().tx_ref().cursor_read::()?; - let first_td = if initial_block_number == 0 { + let mut first_td = if initial_block_number == 0 { U256::ZERO } else { td_cursor @@ -155,11 +155,11 @@ where for pair in headers { let (header, hash) = pair.borrow(); let header = header.borrow(); - let td = first_td + header.difficulty; - self.static_file_writer().append_header(header, td, hash)?; + first_td += header.difficulty; + self.static_file_writer().append_header(header, first_td, hash)?; } - Ok(()) + Ok(first_td) } /// Appends transactions to static files, using the