mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-27 16:18:08 -05:00
chore: combine static file and database services (#9713)
This commit is contained in:
@@ -1,262 +0,0 @@
|
||||
#![allow(dead_code)]
|
||||
|
||||
use crate::static_files::{StaticFileAction, StaticFileServiceHandle};
|
||||
use reth_chain_state::ExecutedBlock;
|
||||
use reth_db::database::Database;
|
||||
use reth_errors::ProviderResult;
|
||||
use reth_primitives::B256;
|
||||
use reth_provider::{
|
||||
writer::StorageWriter, BlockExecutionWriter, BlockNumReader, BlockWriter, HistoryWriter,
|
||||
OriginalValuesKnown, ProviderFactory, StageCheckpointWriter, StateWriter,
|
||||
};
|
||||
use reth_prune::{Pruner, PrunerOutput};
|
||||
use reth_stages_types::{StageCheckpoint, StageId};
|
||||
use std::sync::mpsc::{Receiver, SendError, Sender};
|
||||
use tokio::sync::oneshot;
|
||||
use tracing::debug;
|
||||
|
||||
/// Writes parts of reth's in memory tree state to the database.
|
||||
///
|
||||
/// This is meant to be a spawned service that listens for various incoming database operations,
|
||||
/// performing those actions on disk, and returning the result in a channel.
|
||||
///
|
||||
/// There are two types of operations this service can perform:
|
||||
/// - Writing executed blocks to disk, returning the hash of the latest block that was inserted.
|
||||
/// - Removing blocks from disk, returning the hash of the lowest block removed.
|
||||
///
|
||||
/// This should be spawned in its own thread with [`std::thread::spawn`], since this performs
|
||||
/// blocking database operations in an endless loop.
|
||||
#[derive(Debug)]
|
||||
pub struct DatabaseService<DB> {
|
||||
/// The db / static file provider to use
|
||||
provider: ProviderFactory<DB>,
|
||||
/// Incoming requests to persist stuff
|
||||
incoming: Receiver<DatabaseAction>,
|
||||
/// Handle for the static file service.
|
||||
static_file_handle: StaticFileServiceHandle,
|
||||
/// The pruner
|
||||
pruner: Pruner<DB, ProviderFactory<DB>>,
|
||||
}
|
||||
|
||||
impl<DB: Database> DatabaseService<DB> {
|
||||
/// Create a new database service.
|
||||
///
|
||||
/// NOTE: The [`ProviderFactory`] and [`Pruner`] should be built using an identical
|
||||
/// [`PruneModes`](reth_prune::PruneModes).
|
||||
pub const fn new(
|
||||
provider: ProviderFactory<DB>,
|
||||
incoming: Receiver<DatabaseAction>,
|
||||
static_file_handle: StaticFileServiceHandle,
|
||||
pruner: Pruner<DB, ProviderFactory<DB>>,
|
||||
) -> Self {
|
||||
Self { provider, incoming, static_file_handle, pruner }
|
||||
}
|
||||
|
||||
/// Writes the cloned tree state to the database
|
||||
fn write(&self, blocks: Vec<ExecutedBlock>) -> ProviderResult<()> {
|
||||
let provider_rw = self.provider.provider_rw()?;
|
||||
|
||||
if blocks.is_empty() {
|
||||
debug!(target: "tree::persistence::db", "Attempted to write empty block range");
|
||||
return Ok(())
|
||||
}
|
||||
|
||||
let first_number = blocks.first().unwrap().block().number;
|
||||
|
||||
let last = blocks.last().unwrap().block();
|
||||
let last_block_number = last.number;
|
||||
|
||||
// TODO: remove all the clones and do performant / batched writes for each type of object
|
||||
// instead of a loop over all blocks,
|
||||
// meaning:
|
||||
// * blocks
|
||||
// * state
|
||||
// * hashed state
|
||||
// * trie updates (cannot naively extend, need helper)
|
||||
// * indices (already done basically)
|
||||
// Insert the blocks
|
||||
for block in blocks {
|
||||
let sealed_block =
|
||||
block.block().clone().try_with_senders_unchecked(block.senders().clone()).unwrap();
|
||||
provider_rw.insert_block(sealed_block)?;
|
||||
|
||||
// Write state and changesets to the database.
|
||||
// Must be written after blocks because of the receipt lookup.
|
||||
let execution_outcome = block.execution_outcome().clone();
|
||||
// TODO: use single storage writer in task when sf / db tasks are combined
|
||||
execution_outcome.write_to_storage(&provider_rw, None, OriginalValuesKnown::No)?;
|
||||
|
||||
// insert hashes and intermediate merkle nodes
|
||||
{
|
||||
let trie_updates = block.trie_updates().clone();
|
||||
let hashed_state = block.hashed_state();
|
||||
// TODO: use single storage writer in task when sf / db tasks are combined
|
||||
let storage_writer = StorageWriter::new(Some(&provider_rw), None);
|
||||
storage_writer.write_hashed_state(&hashed_state.clone().into_sorted())?;
|
||||
trie_updates.write_to_database(provider_rw.tx_ref())?;
|
||||
}
|
||||
|
||||
// update history indices
|
||||
provider_rw.update_history_indices(first_number..=last_block_number)?;
|
||||
|
||||
// Update pipeline progress
|
||||
provider_rw.update_pipeline_stages(last_block_number, false)?;
|
||||
}
|
||||
|
||||
debug!(target: "tree::persistence::db", range = ?first_number..=last_block_number, "Appended blocks");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Removes block data above the given block number from the database.
|
||||
/// This is exclusive, i.e., it only removes blocks above `block_number`, and does not remove
|
||||
/// `block_number`.
|
||||
///
|
||||
/// This will then send a command to the static file service, to remove the actual block data.
|
||||
fn remove_blocks_above(
|
||||
&self,
|
||||
block_number: u64,
|
||||
sender: oneshot::Sender<()>,
|
||||
) -> ProviderResult<()> {
|
||||
let provider_rw = self.provider.provider_rw()?;
|
||||
let highest_block = self.provider.last_block_number()?;
|
||||
provider_rw.remove_block_and_execution_range(block_number..=highest_block)?;
|
||||
|
||||
// send a command to the static file service to also remove blocks
|
||||
let _ = self
|
||||
.static_file_handle
|
||||
.send_action(StaticFileAction::RemoveBlocksAbove((block_number, sender)));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Prunes block data before the given block hash according to the configured prune
|
||||
/// configuration.
|
||||
fn prune_before(&mut self, block_num: u64) -> PrunerOutput {
|
||||
// TODO: doing this properly depends on pruner segment changes
|
||||
self.pruner.run(block_num).expect("todo: handle errors")
|
||||
}
|
||||
|
||||
/// Updates checkpoints related to block headers and bodies. This should be called by the static
|
||||
/// file service, after new transactions have been successfully written to disk.
|
||||
fn update_transaction_meta(&self, block_num: u64) -> ProviderResult<()> {
|
||||
let provider_rw = self.provider.provider_rw()?;
|
||||
provider_rw.save_stage_checkpoint(StageId::Headers, StageCheckpoint::new(block_num))?;
|
||||
provider_rw.save_stage_checkpoint(StageId::Bodies, StageCheckpoint::new(block_num))?;
|
||||
provider_rw.commit()?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl<DB> DatabaseService<DB>
|
||||
where
|
||||
DB: Database,
|
||||
{
|
||||
/// This is the main loop, that will listen to database events and perform the requested
|
||||
/// database actions
|
||||
pub fn run(mut self) {
|
||||
// If the receiver errors then senders have disconnected, so the loop should then end.
|
||||
while let Ok(action) = self.incoming.recv() {
|
||||
match action {
|
||||
DatabaseAction::RemoveBlocksAbove((new_tip_num, sender)) => {
|
||||
self.remove_blocks_above(new_tip_num, sender).expect("todo: handle errors");
|
||||
}
|
||||
DatabaseAction::SaveBlocks((blocks, sender)) => {
|
||||
if blocks.is_empty() {
|
||||
todo!("return error or something");
|
||||
}
|
||||
let last_block_hash = blocks.last().unwrap().block().hash();
|
||||
self.write(blocks).unwrap();
|
||||
|
||||
// we ignore the error because the caller may or may not care about the result
|
||||
let _ = sender.send(last_block_hash);
|
||||
}
|
||||
DatabaseAction::PruneBefore((block_num, sender)) => {
|
||||
let res = self.prune_before(block_num);
|
||||
|
||||
// we ignore the error because the caller may or may not care about the result
|
||||
let _ = sender.send(res);
|
||||
}
|
||||
DatabaseAction::UpdateTransactionMeta((block_num, sender)) => {
|
||||
self.update_transaction_meta(block_num).expect("todo: handle errors");
|
||||
|
||||
// we ignore the error because the caller may or may not care about the result
|
||||
let _ = sender.send(());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A signal to the database service that part of the tree state can be persisted.
|
||||
#[derive(Debug)]
|
||||
pub enum DatabaseAction {
|
||||
/// The section of tree state that should be persisted. These blocks are expected in order of
|
||||
/// increasing block number.
|
||||
///
|
||||
/// This should just store the execution history-related data. Header, transaction, and
|
||||
/// receipt-related data should already be written to static files.
|
||||
SaveBlocks((Vec<ExecutedBlock>, oneshot::Sender<B256>)),
|
||||
|
||||
/// Updates checkpoints related to block headers and bodies. This should be called by the
|
||||
/// static file service, after new transactions have been successfully written to disk.
|
||||
UpdateTransactionMeta((u64, oneshot::Sender<()>)),
|
||||
|
||||
/// Removes block data above the given block number from the database.
|
||||
///
|
||||
/// This will then send a command to the static file service, to remove the actual block data.
|
||||
RemoveBlocksAbove((u64, oneshot::Sender<()>)),
|
||||
|
||||
/// Prune associated block data before the given block number, according to already-configured
|
||||
/// prune modes.
|
||||
PruneBefore((u64, oneshot::Sender<PrunerOutput>)),
|
||||
}
|
||||
|
||||
/// A handle to the database service
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct DatabaseServiceHandle {
|
||||
/// The channel used to communicate with the database service
|
||||
sender: Sender<DatabaseAction>,
|
||||
}
|
||||
|
||||
impl DatabaseServiceHandle {
|
||||
/// Create a new [`DatabaseServiceHandle`] from a [`Sender<DatabaseAction>`].
|
||||
pub const fn new(sender: Sender<DatabaseAction>) -> Self {
|
||||
Self { sender }
|
||||
}
|
||||
|
||||
/// Sends a specific [`DatabaseAction`] in the contained channel. The caller is responsible
|
||||
/// for creating any channels for the given action.
|
||||
pub fn send_action(&self, action: DatabaseAction) -> Result<(), SendError<DatabaseAction>> {
|
||||
self.sender.send(action)
|
||||
}
|
||||
|
||||
/// Tells the database service to save a certain list of finalized blocks. The blocks are
|
||||
/// assumed to be ordered by block number.
|
||||
///
|
||||
/// This returns the latest hash that has been saved, allowing removal of that block and any
|
||||
/// previous blocks from in-memory data structures.
|
||||
pub async fn save_blocks(&self, blocks: Vec<ExecutedBlock>) -> B256 {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.sender.send(DatabaseAction::SaveBlocks((blocks, tx))).expect("should be able to send");
|
||||
rx.await.expect("todo: err handling")
|
||||
}
|
||||
|
||||
/// Tells the database service to remove blocks above a certain block number.
|
||||
pub async fn remove_blocks_above(&self, block_num: u64) {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.sender
|
||||
.send(DatabaseAction::RemoveBlocksAbove((block_num, tx)))
|
||||
.expect("should be able to send");
|
||||
rx.await.expect("todo: err handling")
|
||||
}
|
||||
|
||||
/// Tells the database service to remove block data before the given hash, according to the
|
||||
/// configured prune config.
|
||||
pub async fn prune_before(&self, block_num: u64) -> PrunerOutput {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.sender
|
||||
.send(DatabaseAction::PruneBefore((block_num, tx)))
|
||||
.expect("should be able to send");
|
||||
rx.await.expect("todo: err handling")
|
||||
}
|
||||
}
|
||||
@@ -20,18 +20,14 @@ pub use reth_blockchain_tree_api::*;
|
||||
pub mod backfill;
|
||||
/// The type that drives the chain forward.
|
||||
pub mod chain;
|
||||
/// The background writer service for batch db writes.
|
||||
pub mod database;
|
||||
/// Support for downloading blocks on demand for live sync.
|
||||
pub mod download;
|
||||
/// Engine Api chain handler support.
|
||||
pub mod engine;
|
||||
/// Metrics support.
|
||||
pub mod metrics;
|
||||
/// The background writer service, coordinating the static file and database services.
|
||||
/// The background writer service, coordinating write operations on static files and the database.
|
||||
pub mod persistence;
|
||||
/// The background writer service for static file writes.
|
||||
pub mod static_files;
|
||||
/// Support for interacting with the blockchain tree.
|
||||
pub mod tree;
|
||||
|
||||
|
||||
@@ -1,35 +1,328 @@
|
||||
#![allow(dead_code)]
|
||||
|
||||
use crate::{
|
||||
database::{DatabaseAction, DatabaseService, DatabaseServiceHandle},
|
||||
static_files::{StaticFileAction, StaticFileService, StaticFileServiceHandle},
|
||||
};
|
||||
use reth_chain_state::ExecutedBlock;
|
||||
use reth_db::Database;
|
||||
use reth_primitives::{SealedBlock, B256, U256};
|
||||
use reth_provider::ProviderFactory;
|
||||
use reth_errors::ProviderResult;
|
||||
use reth_primitives::{SealedBlock, StaticFileSegment, TransactionSignedNoHash, B256, U256};
|
||||
use reth_provider::{
|
||||
writer::StorageWriter, BlockExecutionWriter, BlockNumReader, BlockWriter, HistoryWriter,
|
||||
OriginalValuesKnown, ProviderFactory, StageCheckpointWriter, StateWriter,
|
||||
StaticFileProviderFactory, StaticFileWriter, TransactionsProviderExt,
|
||||
};
|
||||
use reth_prune::{Pruner, PrunerOutput};
|
||||
use reth_stages_types::{StageCheckpoint, StageId};
|
||||
use std::sync::{
|
||||
mpsc::{SendError, Sender},
|
||||
mpsc::{Receiver, SendError, Sender},
|
||||
Arc,
|
||||
};
|
||||
use tokio::sync::oneshot;
|
||||
use tracing::debug;
|
||||
|
||||
/// A signal to the database and static file services that part of the tree state can be persisted.
|
||||
/// Writes parts of reth's in memory tree state to the database and static files.
|
||||
///
|
||||
/// This is meant to be a spawned service that listens for various incoming persistence operations,
|
||||
/// performing those actions on disk, and returning the result in a channel.
|
||||
///
|
||||
/// This should be spawned in its own thread with [`std::thread::spawn`], since this performs
|
||||
/// blocking I/O operations in an endless loop.
|
||||
#[derive(Debug)]
|
||||
pub struct PersistenceService<DB> {
|
||||
/// The provider factory to use
|
||||
provider: ProviderFactory<DB>,
|
||||
/// Incoming requests
|
||||
incoming: Receiver<PersistenceAction>,
|
||||
/// The pruner
|
||||
pruner: Pruner<DB, ProviderFactory<DB>>,
|
||||
}
|
||||
|
||||
impl<DB: Database> PersistenceService<DB> {
|
||||
/// Create a new persistence service
|
||||
pub const fn new(
|
||||
provider: ProviderFactory<DB>,
|
||||
incoming: Receiver<PersistenceAction>,
|
||||
pruner: Pruner<DB, ProviderFactory<DB>>,
|
||||
) -> Self {
|
||||
Self { provider, incoming, pruner }
|
||||
}
|
||||
|
||||
/// Writes the cloned tree state to database
|
||||
fn write(&self, blocks: &[ExecutedBlock]) -> ProviderResult<()> {
|
||||
debug!(target: "tree::persistence", "Writing blocks to database");
|
||||
let provider_rw = self.provider.provider_rw()?;
|
||||
|
||||
if blocks.is_empty() {
|
||||
debug!(target: "tree::persistence", "Attempted to write empty block range");
|
||||
return Ok(())
|
||||
}
|
||||
|
||||
let first_number = blocks.first().unwrap().block().number;
|
||||
|
||||
let last = blocks.last().unwrap().block();
|
||||
let last_block_number = last.number;
|
||||
|
||||
// TODO: remove all the clones and do performant / batched writes for each type of object
|
||||
// instead of a loop over all blocks,
|
||||
// meaning:
|
||||
// * blocks
|
||||
// * state
|
||||
// * hashed state
|
||||
// * trie updates (cannot naively extend, need helper)
|
||||
// * indices (already done basically)
|
||||
// Insert the blocks
|
||||
for block in blocks {
|
||||
let sealed_block =
|
||||
block.block().clone().try_with_senders_unchecked(block.senders().clone()).unwrap();
|
||||
provider_rw.insert_block(sealed_block)?;
|
||||
|
||||
// Write state and changesets to the database.
|
||||
// Must be written after blocks because of the receipt lookup.
|
||||
let execution_outcome = block.execution_outcome().clone();
|
||||
// TODO: do we provide a static file producer here?
|
||||
execution_outcome.write_to_storage(&provider_rw, None, OriginalValuesKnown::No)?;
|
||||
|
||||
// insert hashes and intermediate merkle nodes
|
||||
{
|
||||
let trie_updates = block.trie_updates().clone();
|
||||
let hashed_state = block.hashed_state();
|
||||
// TODO: use single storage writer in task when sf / db tasks are combined
|
||||
let storage_writer = StorageWriter::new(Some(&provider_rw), None);
|
||||
storage_writer.write_hashed_state(&hashed_state.clone().into_sorted())?;
|
||||
trie_updates.write_to_database(provider_rw.tx_ref())?;
|
||||
}
|
||||
|
||||
// update history indices
|
||||
provider_rw.update_history_indices(first_number..=last_block_number)?;
|
||||
|
||||
// Update pipeline progress
|
||||
provider_rw.update_pipeline_stages(last_block_number, false)?;
|
||||
}
|
||||
|
||||
debug!(target: "tree::persistence", range = ?first_number..=last_block_number, "Appended block data");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Removes block data above the given block number from the database.
|
||||
/// This is exclusive, i.e., it only removes blocks above `block_number`, and does not remove
|
||||
/// `block_number`.
|
||||
///
|
||||
/// This will then send a command to the static file service, to remove the actual block data.
|
||||
fn remove_blocks_above(&self, block_number: u64) -> ProviderResult<()> {
|
||||
debug!(target: "tree::persistence", ?block_number, "Removing blocks from database above block_number");
|
||||
let provider_rw = self.provider.provider_rw()?;
|
||||
let highest_block = self.provider.last_block_number()?;
|
||||
provider_rw.remove_block_and_execution_range(block_number..=highest_block)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Prunes block data before the given block hash according to the configured prune
|
||||
/// configuration.
|
||||
fn prune_before(&mut self, block_num: u64) -> PrunerOutput {
|
||||
debug!(target: "tree::persistence", ?block_num, "Running pruner");
|
||||
// TODO: doing this properly depends on pruner segment changes
|
||||
self.pruner.run(block_num).expect("todo: handle errors")
|
||||
}
|
||||
|
||||
/// Updates checkpoints related to block headers and bodies. This should be called after new
|
||||
/// transactions have been successfully written to disk.
|
||||
fn update_transaction_meta(&self, block_num: u64) -> ProviderResult<()> {
|
||||
debug!(target: "tree::persistence", ?block_num, "Updating transaction metadata after writing");
|
||||
let provider_rw = self.provider.provider_rw()?;
|
||||
provider_rw.save_stage_checkpoint(StageId::Headers, StageCheckpoint::new(block_num))?;
|
||||
provider_rw.save_stage_checkpoint(StageId::Bodies, StageCheckpoint::new(block_num))?;
|
||||
provider_rw.commit()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// TODO: perform read for td and start
|
||||
/// Writes the transactions to static files, to act as a log.
|
||||
///
|
||||
/// The [`update_transaction_meta`](Self::update_transaction_meta) method should be called
|
||||
/// after this, to update the checkpoints for headers and block bodies.
|
||||
fn log_transactions(
|
||||
&self,
|
||||
block: Arc<SealedBlock>,
|
||||
start_tx_number: u64,
|
||||
td: U256,
|
||||
) -> ProviderResult<u64> {
|
||||
debug!(target: "tree::persistence", ?td, ?start_tx_number, "Logging transactions");
|
||||
let provider = self.provider.static_file_provider();
|
||||
let mut header_writer = provider.get_writer(block.number, StaticFileSegment::Headers)?;
|
||||
let mut transactions_writer =
|
||||
provider.get_writer(block.number, StaticFileSegment::Transactions)?;
|
||||
|
||||
header_writer.append_header(block.header(), td, &block.hash())?;
|
||||
let no_hash_transactions =
|
||||
block.body.clone().into_iter().map(TransactionSignedNoHash::from);
|
||||
|
||||
let mut tx_number = start_tx_number;
|
||||
for tx in no_hash_transactions {
|
||||
transactions_writer.append_transaction(tx_number, &tx)?;
|
||||
tx_number += 1;
|
||||
}
|
||||
|
||||
// increment block for transactions
|
||||
transactions_writer.increment_block(StaticFileSegment::Transactions, block.number)?;
|
||||
|
||||
// finally commit
|
||||
transactions_writer.commit()?;
|
||||
header_writer.commit()?;
|
||||
|
||||
Ok(block.number)
|
||||
}
|
||||
|
||||
/// Write execution-related block data to static files.
|
||||
///
|
||||
/// This will then send a command to the db service, that it should write new data, and update
|
||||
/// the checkpoints for execution and beyond.
|
||||
fn write_execution_data(&self, blocks: &[ExecutedBlock]) -> ProviderResult<()> {
|
||||
if blocks.is_empty() {
|
||||
return Ok(())
|
||||
}
|
||||
let provider_rw = self.provider.provider_rw()?;
|
||||
let provider = self.provider.static_file_provider();
|
||||
|
||||
// NOTE: checked non-empty above
|
||||
let first_block = blocks.first().unwrap().block();
|
||||
let last_block = blocks.last().unwrap().block().clone();
|
||||
|
||||
// use the storage writer
|
||||
let current_block = first_block.number;
|
||||
debug!(target: "tree::persistence", len=blocks.len(), ?current_block, "Writing execution data to static files");
|
||||
|
||||
let receipts_writer =
|
||||
provider.get_writer(first_block.number, StaticFileSegment::Receipts)?;
|
||||
|
||||
let storage_writer = StorageWriter::new(Some(&provider_rw), Some(receipts_writer));
|
||||
let receipts_iter = blocks.iter().map(|block| {
|
||||
let receipts = block.execution_outcome().receipts().receipt_vec.clone();
|
||||
debug_assert!(receipts.len() == 1);
|
||||
receipts.first().unwrap().clone()
|
||||
});
|
||||
storage_writer.append_receipts_from_blocks(current_block, receipts_iter)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Removes the blocks above the given block number from static files. Also removes related
|
||||
/// receipt and header data.
|
||||
///
|
||||
/// This is exclusive, i.e., it only removes blocks above `block_number`, and does not remove
|
||||
/// `block_number`.
|
||||
///
|
||||
/// Returns the block hash for the lowest block removed from the database, which should be
|
||||
/// the hash for `block_number + 1`.
|
||||
///
|
||||
/// This is meant to be called by the db service, as this should only be done after related data
|
||||
/// is removed from the database, and checkpoints are updated.
|
||||
///
|
||||
/// Returns the hash of the lowest removed block.
|
||||
fn remove_static_file_blocks_above(&self, block_number: u64) -> ProviderResult<()> {
|
||||
debug!(target: "tree::persistence", ?block_number, "Removing static file blocks above block_number");
|
||||
let sf_provider = self.provider.static_file_provider();
|
||||
let db_provider_rw = self.provider.provider_rw()?;
|
||||
|
||||
// get highest static file block for the total block range
|
||||
let highest_static_file_block = sf_provider
|
||||
.get_highest_static_file_block(StaticFileSegment::Headers)
|
||||
.expect("todo: error handling, headers should exist");
|
||||
|
||||
// Get the total txs for the block range, so we have the correct number of columns for
|
||||
// receipts and transactions
|
||||
let tx_range = db_provider_rw
|
||||
.transaction_range_by_block_range(block_number..=highest_static_file_block)?;
|
||||
let total_txs = tx_range.end().saturating_sub(*tx_range.start());
|
||||
|
||||
// get the writers
|
||||
let mut header_writer = sf_provider.get_writer(block_number, StaticFileSegment::Headers)?;
|
||||
let mut transactions_writer =
|
||||
sf_provider.get_writer(block_number, StaticFileSegment::Transactions)?;
|
||||
let mut receipts_writer =
|
||||
sf_provider.get_writer(block_number, StaticFileSegment::Receipts)?;
|
||||
|
||||
// finally actually truncate, these internally commit
|
||||
receipts_writer.prune_receipts(total_txs, block_number)?;
|
||||
transactions_writer.prune_transactions(total_txs, block_number)?;
|
||||
header_writer.prune_headers(highest_static_file_block.saturating_sub(block_number))?;
|
||||
|
||||
sf_provider.commit()?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl<DB> PersistenceService<DB>
|
||||
where
|
||||
DB: Database,
|
||||
{
|
||||
/// This is the main loop, that will listen to database events and perform the requested
|
||||
/// database actions
|
||||
pub fn run(mut self) {
|
||||
// If the receiver errors then senders have disconnected, so the loop should then end.
|
||||
while let Ok(action) = self.incoming.recv() {
|
||||
match action {
|
||||
PersistenceAction::RemoveBlocksAbove((new_tip_num, sender)) => {
|
||||
self.remove_blocks_above(new_tip_num).expect("todo: handle errors");
|
||||
self.remove_static_file_blocks_above(new_tip_num).expect("todo: handle errors");
|
||||
|
||||
// we ignore the error because the caller may or may not care about the result
|
||||
let _ = sender.send(());
|
||||
}
|
||||
PersistenceAction::SaveBlocks((blocks, sender)) => {
|
||||
if blocks.is_empty() {
|
||||
todo!("return error or something");
|
||||
}
|
||||
let last_block_hash = blocks.last().unwrap().block().hash();
|
||||
// first write to static files
|
||||
self.write_execution_data(&blocks).expect("todo: handle errors");
|
||||
// then write to db
|
||||
self.write(&blocks).expect("todo: handle errors");
|
||||
|
||||
// we ignore the error because the caller may or may not care about the result
|
||||
let _ = sender.send(last_block_hash);
|
||||
}
|
||||
PersistenceAction::PruneBefore((block_num, sender)) => {
|
||||
let res = self.prune_before(block_num);
|
||||
|
||||
// we ignore the error because the caller may or may not care about the result
|
||||
let _ = sender.send(res);
|
||||
}
|
||||
PersistenceAction::LogTransactions((block, start_tx_number, td, sender)) => {
|
||||
let block_num = self
|
||||
.log_transactions(block, start_tx_number, td)
|
||||
.expect("todo: handle errors");
|
||||
self.update_transaction_meta(block_num).expect("todo: handle errors");
|
||||
|
||||
// we ignore the error because the caller may or may not care about the result
|
||||
let _ = sender.send(());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A signal to the persistence service that part of the tree state can be persisted.
|
||||
#[derive(Debug)]
|
||||
pub enum PersistenceAction {
|
||||
/// The given block has been added to the canonical chain, its transactions and headers will be
|
||||
/// persisted for durability.
|
||||
LogTransactions((Arc<SealedBlock>, u64, U256, oneshot::Sender<()>)),
|
||||
|
||||
/// The section of tree state that should be persisted. These blocks are expected in order of
|
||||
/// increasing block number.
|
||||
///
|
||||
/// This should just store the execution history-related data. Header, transaction, and
|
||||
/// receipt-related data should already be written to static files.
|
||||
/// First, header, transaction, and receipt-related data should be written to static files.
|
||||
/// Then the execution history-related data will be written to the database.
|
||||
SaveBlocks((Vec<ExecutedBlock>, oneshot::Sender<B256>)),
|
||||
|
||||
/// The given block has been added to the canonical chain, its transactions and headers will be
|
||||
/// persisted for durability.
|
||||
///
|
||||
/// This will first append the header and transactions to static files, then update the
|
||||
/// checkpoints for headers and block bodies in the database.
|
||||
LogTransactions((Arc<SealedBlock>, u64, U256, oneshot::Sender<()>)),
|
||||
|
||||
/// Removes block data above the given block number from the database.
|
||||
///
|
||||
/// This will first update checkpoints from the database, then remove actual block data from
|
||||
/// static files.
|
||||
RemoveBlocksAbove((u64, oneshot::Sender<()>)),
|
||||
|
||||
/// Prune associated block data before the given block number, according to already-configured
|
||||
@@ -37,111 +330,47 @@ pub enum PersistenceAction {
|
||||
PruneBefore((u64, oneshot::Sender<PrunerOutput>)),
|
||||
}
|
||||
|
||||
/// An error type for when there is a [`SendError`] while sending an action to one of the services.
|
||||
#[derive(Debug)]
|
||||
pub enum PersistenceSendError {
|
||||
/// When there is an error sending to the static file service
|
||||
StaticFile(SendError<StaticFileAction>),
|
||||
/// When there is an error sending to the database service
|
||||
Database(SendError<DatabaseAction>),
|
||||
}
|
||||
|
||||
impl From<SendError<StaticFileAction>> for PersistenceSendError {
|
||||
fn from(value: SendError<StaticFileAction>) -> Self {
|
||||
Self::StaticFile(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<SendError<DatabaseAction>> for PersistenceSendError {
|
||||
fn from(value: SendError<DatabaseAction>) -> Self {
|
||||
Self::Database(value)
|
||||
}
|
||||
}
|
||||
|
||||
/// A handle to the database and static file services. This will send commands to the correct
|
||||
/// service, depending on the command.
|
||||
///
|
||||
/// Some commands should be sent to the database service, and others should be sent to the static
|
||||
/// file service, despite having the same name. This is because some actions require work to be done
|
||||
/// by both the static file _and_ the database service, and require some coordination.
|
||||
///
|
||||
/// This type is what actually coordinates the two services, and should be used by consumers of the
|
||||
/// persistence related services.
|
||||
/// A handle to the persistence service
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct PersistenceHandle {
|
||||
/// The channel used to communicate with the database service
|
||||
db_sender: Sender<DatabaseAction>,
|
||||
/// The channel used to communicate with the static file service
|
||||
static_file_sender: Sender<StaticFileAction>,
|
||||
/// The channel used to communicate with the persistence service
|
||||
sender: Sender<PersistenceAction>,
|
||||
}
|
||||
|
||||
impl PersistenceHandle {
|
||||
/// Create a new [`PersistenceHandle`] from a [`Sender<PersistenceAction>`].
|
||||
pub const fn new(
|
||||
db_sender: Sender<DatabaseAction>,
|
||||
static_file_sender: Sender<StaticFileAction>,
|
||||
) -> Self {
|
||||
Self { db_sender, static_file_sender }
|
||||
pub const fn new(sender: Sender<PersistenceAction>) -> Self {
|
||||
Self { sender }
|
||||
}
|
||||
|
||||
/// Create a new [`PersistenceHandle`], and spawn the database and static file services.
|
||||
/// Create a new [`PersistenceHandle`], and spawn the persistence service.
|
||||
pub fn spawn_services<DB: Database + 'static>(
|
||||
provider_factory: ProviderFactory<DB>,
|
||||
pruner: Pruner<DB, ProviderFactory<DB>>,
|
||||
) -> Self {
|
||||
// create the initial channels
|
||||
let (static_file_service_tx, static_file_service_rx) = std::sync::mpsc::channel();
|
||||
let (db_service_tx, db_service_rx) = std::sync::mpsc::channel();
|
||||
|
||||
// construct persistence handle
|
||||
let persistence_handle = Self::new(db_service_tx.clone(), static_file_service_tx.clone());
|
||||
let persistence_handle = Self::new(db_service_tx);
|
||||
|
||||
// construct handles for the services to talk to each other
|
||||
let static_file_handle = StaticFileServiceHandle::new(static_file_service_tx);
|
||||
let database_handle = DatabaseServiceHandle::new(db_service_tx);
|
||||
|
||||
// spawn the db service
|
||||
let db_service = DatabaseService::new(
|
||||
provider_factory.clone(),
|
||||
db_service_rx,
|
||||
static_file_handle,
|
||||
pruner,
|
||||
);
|
||||
// spawn the persistence service
|
||||
let db_service = PersistenceService::new(provider_factory, db_service_rx, pruner);
|
||||
std::thread::Builder::new()
|
||||
.name("Database Service".to_string())
|
||||
.name("Persistence Service".to_string())
|
||||
.spawn(|| db_service.run())
|
||||
.unwrap();
|
||||
|
||||
// spawn the static file service
|
||||
let static_file_service =
|
||||
StaticFileService::new(provider_factory, static_file_service_rx, database_handle);
|
||||
std::thread::Builder::new()
|
||||
.name("Static File Service".to_string())
|
||||
.spawn(|| static_file_service.run())
|
||||
.unwrap();
|
||||
|
||||
persistence_handle
|
||||
}
|
||||
|
||||
/// Sends a specific [`PersistenceAction`] in the contained channel. The caller is responsible
|
||||
/// for creating any channels for the given action.
|
||||
pub fn send_action(&self, action: PersistenceAction) -> Result<(), PersistenceSendError> {
|
||||
match action {
|
||||
PersistenceAction::LogTransactions(input) => self
|
||||
.static_file_sender
|
||||
.send(StaticFileAction::LogTransactions(input))
|
||||
.map_err(From::from),
|
||||
PersistenceAction::SaveBlocks(input) => self
|
||||
.static_file_sender
|
||||
.send(StaticFileAction::WriteExecutionData(input))
|
||||
.map_err(From::from),
|
||||
PersistenceAction::RemoveBlocksAbove(input) => {
|
||||
self.db_sender.send(DatabaseAction::RemoveBlocksAbove(input)).map_err(From::from)
|
||||
}
|
||||
PersistenceAction::PruneBefore(input) => {
|
||||
self.db_sender.send(DatabaseAction::PruneBefore(input)).map_err(From::from)
|
||||
}
|
||||
}
|
||||
pub fn send_action(
|
||||
&self,
|
||||
action: PersistenceAction,
|
||||
) -> Result<(), SendError<PersistenceAction>> {
|
||||
self.sender.send(action)
|
||||
}
|
||||
|
||||
/// Tells the persistence service to save a certain list of finalized blocks. The blocks are
|
||||
@@ -207,6 +436,7 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_save_blocks_empty() {
|
||||
reth_tracing::init_test_tracing();
|
||||
let persistence_handle = default_persistence_handle();
|
||||
|
||||
let blocks = vec![];
|
||||
@@ -220,6 +450,7 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_save_blocks_single_block() {
|
||||
reth_tracing::init_test_tracing();
|
||||
let persistence_handle = default_persistence_handle();
|
||||
let block_number = 0;
|
||||
let executed = get_executed_block_with_number(block_number);
|
||||
@@ -236,6 +467,7 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_save_blocks_multiple_blocks() {
|
||||
reth_tracing::init_test_tracing();
|
||||
let persistence_handle = default_persistence_handle();
|
||||
|
||||
let blocks = get_executed_blocks(0..5).collect::<Vec<_>>();
|
||||
@@ -250,6 +482,7 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_save_blocks_multiple_calls() {
|
||||
reth_tracing::init_test_tracing();
|
||||
let persistence_handle = default_persistence_handle();
|
||||
|
||||
let ranges = [0..1, 1..2, 2..4, 4..5];
|
||||
|
||||
@@ -1,269 +0,0 @@
|
||||
#![allow(dead_code)]
|
||||
|
||||
use crate::database::{DatabaseAction, DatabaseServiceHandle};
|
||||
use reth_chain_state::ExecutedBlock;
|
||||
use reth_db::database::Database;
|
||||
use reth_errors::ProviderResult;
|
||||
use reth_primitives::{SealedBlock, StaticFileSegment, TransactionSignedNoHash, B256, U256};
|
||||
use reth_provider::{
|
||||
ProviderFactory, StaticFileProviderFactory, StaticFileWriter, TransactionsProviderExt,
|
||||
};
|
||||
use std::sync::{
|
||||
mpsc::{Receiver, SendError, Sender},
|
||||
Arc,
|
||||
};
|
||||
use tokio::sync::oneshot;
|
||||
|
||||
/// Writes finalized blocks to reth's static files.
|
||||
///
|
||||
/// This is meant to be a spawned service that listens for various incoming finalization operations,
|
||||
/// and writing to or producing new static files.
|
||||
///
|
||||
/// This should be spawned in its own thread with [`std::thread::spawn`], since this performs
|
||||
/// blocking file operations in an endless loop.
|
||||
#[derive(Debug)]
|
||||
pub struct StaticFileService<DB> {
|
||||
/// The db / static file provider to use
|
||||
provider: ProviderFactory<DB>,
|
||||
/// Handle for the database service
|
||||
database_handle: DatabaseServiceHandle,
|
||||
/// Incoming requests to write static files
|
||||
incoming: Receiver<StaticFileAction>,
|
||||
}
|
||||
|
||||
impl<DB> StaticFileService<DB>
|
||||
where
|
||||
DB: Database + 'static,
|
||||
{
|
||||
/// Create a new static file service.
|
||||
pub const fn new(
|
||||
provider: ProviderFactory<DB>,
|
||||
incoming: Receiver<StaticFileAction>,
|
||||
database_handle: DatabaseServiceHandle,
|
||||
) -> Self {
|
||||
Self { provider, database_handle, incoming }
|
||||
}
|
||||
|
||||
// TODO: some things about this are a bit weird, and just to make the underlying static file
|
||||
// writes work - tx number, total difficulty inclusion. They require either additional in memory
|
||||
// data or a db lookup. Maybe we can use a db read here
|
||||
/// Writes the transactions to static files, to act as a log.
|
||||
///
|
||||
/// This will then send a command to the db service, that it should update the checkpoints for
|
||||
/// headers and block bodies.
|
||||
fn log_transactions(
|
||||
&self,
|
||||
block: Arc<SealedBlock>,
|
||||
start_tx_number: u64,
|
||||
td: U256,
|
||||
sender: oneshot::Sender<()>,
|
||||
) -> ProviderResult<()> {
|
||||
let provider = self.provider.static_file_provider();
|
||||
let mut header_writer = provider.get_writer(block.number, StaticFileSegment::Headers)?;
|
||||
let mut transactions_writer =
|
||||
provider.get_writer(block.number, StaticFileSegment::Transactions)?;
|
||||
|
||||
// TODO: does to_compact require ownership?
|
||||
header_writer.append_header(block.header(), td, &block.hash())?;
|
||||
let no_hash_transactions =
|
||||
block.body.clone().into_iter().map(TransactionSignedNoHash::from);
|
||||
|
||||
let mut tx_number = start_tx_number;
|
||||
for tx in no_hash_transactions {
|
||||
transactions_writer.append_transaction(tx_number, &tx)?;
|
||||
tx_number += 1;
|
||||
}
|
||||
|
||||
// increment block for transactions
|
||||
transactions_writer.increment_block(StaticFileSegment::Transactions, block.number)?;
|
||||
|
||||
// finally commit
|
||||
transactions_writer.commit()?;
|
||||
header_writer.commit()?;
|
||||
|
||||
// TODO: do we care about the mpsc error here?
|
||||
// send a command to the db service to update the checkpoints for headers / bodies
|
||||
let _ = self
|
||||
.database_handle
|
||||
.send_action(DatabaseAction::UpdateTransactionMeta((block.number, sender)));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Write execution-related block data to static files.
|
||||
///
|
||||
/// This will then send a command to the db service, that it should write new data, and update
|
||||
/// the checkpoints for execution and beyond.
|
||||
fn write_execution_data(
|
||||
&self,
|
||||
blocks: Vec<ExecutedBlock>,
|
||||
sender: oneshot::Sender<B256>,
|
||||
) -> ProviderResult<()> {
|
||||
if blocks.is_empty() {
|
||||
return Ok(())
|
||||
}
|
||||
let provider = self.provider.static_file_provider();
|
||||
|
||||
// NOTE: checked non-empty above
|
||||
let first_block = blocks.first().unwrap().block();
|
||||
let last_block = blocks.last().unwrap().block();
|
||||
|
||||
// get highest receipt, if it returns none, use zero (this is the first static file write)
|
||||
let mut current_receipt = provider
|
||||
.get_highest_static_file_tx(StaticFileSegment::Receipts)
|
||||
.map(|num| num + 1)
|
||||
.unwrap_or_default();
|
||||
let mut current_block = first_block.number;
|
||||
|
||||
let mut receipts_writer =
|
||||
provider.get_writer(first_block.number, StaticFileSegment::Receipts)?;
|
||||
for receipts in blocks.iter().map(|block| block.execution_outcome().receipts.clone()) {
|
||||
debug_assert!(receipts.len() == 1);
|
||||
// TODO: should we also assert that the receipt is not None here, that means the
|
||||
// receipt is pruned
|
||||
for maybe_receipt in receipts.first().unwrap() {
|
||||
if let Some(receipt) = maybe_receipt {
|
||||
receipts_writer.append_receipt(current_receipt, receipt)?;
|
||||
}
|
||||
current_receipt += 1;
|
||||
}
|
||||
|
||||
// increment the block
|
||||
receipts_writer.increment_block(StaticFileSegment::Receipts, current_block)?;
|
||||
current_block += 1;
|
||||
}
|
||||
|
||||
// finally increment block and commit
|
||||
receipts_writer.commit()?;
|
||||
|
||||
// TODO: do we care about the mpsc error here?
|
||||
// send a command to the db service to update the checkpoints for execution etc.
|
||||
let _ = self.database_handle.send_action(DatabaseAction::SaveBlocks((blocks, sender)));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Removes the blocks above the given block number from static files. Also removes related
|
||||
/// receipt and header data.
|
||||
///
|
||||
/// This is exclusive, i.e., it only removes blocks above `block_number`, and does not remove
|
||||
/// `block_number`.
|
||||
///
|
||||
/// Returns the block hash for the lowest block removed from the database, which should be
|
||||
/// the hash for `block_number + 1`.
|
||||
///
|
||||
/// This is meant to be called by the db service, as this should only be done after related data
|
||||
/// is removed from the database, and checkpoints are updated.
|
||||
///
|
||||
/// Returns the hash of the lowest removed block.
|
||||
fn remove_blocks_above(
|
||||
&self,
|
||||
block_num: u64,
|
||||
sender: oneshot::Sender<()>,
|
||||
) -> ProviderResult<()> {
|
||||
let sf_provider = self.provider.static_file_provider();
|
||||
let db_provider_rw = self.provider.provider_rw()?;
|
||||
|
||||
// get highest static file block for the total block range
|
||||
let highest_static_file_block = sf_provider
|
||||
.get_highest_static_file_block(StaticFileSegment::Headers)
|
||||
.expect("todo: error handling, headers should exist");
|
||||
|
||||
// Get the total txs for the block range, so we have the correct number of columns for
|
||||
// receipts and transactions
|
||||
let tx_range = db_provider_rw
|
||||
.transaction_range_by_block_range(block_num..=highest_static_file_block)?;
|
||||
let total_txs = tx_range.end().saturating_sub(*tx_range.start());
|
||||
|
||||
// get the writers
|
||||
let mut header_writer = sf_provider.get_writer(block_num, StaticFileSegment::Headers)?;
|
||||
let mut transactions_writer =
|
||||
sf_provider.get_writer(block_num, StaticFileSegment::Transactions)?;
|
||||
let mut receipts_writer = sf_provider.get_writer(block_num, StaticFileSegment::Receipts)?;
|
||||
|
||||
// finally actually truncate, these internally commit
|
||||
receipts_writer.prune_receipts(total_txs, block_num)?;
|
||||
transactions_writer.prune_transactions(total_txs, block_num)?;
|
||||
header_writer.prune_headers(highest_static_file_block.saturating_sub(block_num))?;
|
||||
|
||||
sf_provider.commit()?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl<DB> StaticFileService<DB>
|
||||
where
|
||||
DB: Database + 'static,
|
||||
{
|
||||
/// This is the main loop, that will listen to static file actions, and write DB data to static
|
||||
/// files.
|
||||
pub fn run(self) {
|
||||
// If the receiver errors then senders have disconnected, so the loop should then end.
|
||||
while let Ok(action) = self.incoming.recv() {
|
||||
match action {
|
||||
StaticFileAction::LogTransactions((
|
||||
block,
|
||||
start_tx_number,
|
||||
td,
|
||||
response_sender,
|
||||
)) => {
|
||||
self.log_transactions(block, start_tx_number, td, response_sender)
|
||||
.expect("todo: handle errors");
|
||||
}
|
||||
StaticFileAction::RemoveBlocksAbove((block_num, response_sender)) => {
|
||||
self.remove_blocks_above(block_num, response_sender)
|
||||
.expect("todo: handle errors");
|
||||
}
|
||||
StaticFileAction::WriteExecutionData((blocks, response_sender)) => {
|
||||
self.write_execution_data(blocks, response_sender)
|
||||
.expect("todo: handle errors");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A signal to the static file service that some data should be copied from the DB to static files.
|
||||
#[derive(Debug)]
|
||||
pub enum StaticFileAction {
|
||||
/// The given block has been added to the canonical chain, its transactions and headers will be
|
||||
/// persisted for durability.
|
||||
///
|
||||
/// This will then send a command to the db service, that it should update the checkpoints for
|
||||
/// headers and block bodies.
|
||||
LogTransactions((Arc<SealedBlock>, u64, U256, oneshot::Sender<()>)),
|
||||
|
||||
/// Write execution-related block data to static files.
|
||||
///
|
||||
/// This will then send a command to the db service, that it should write new data, and update
|
||||
/// the checkpoints for execution and beyond.
|
||||
WriteExecutionData((Vec<ExecutedBlock>, oneshot::Sender<B256>)),
|
||||
|
||||
/// Removes the blocks above the given block number from static files. Also removes related
|
||||
/// receipt and header data.
|
||||
///
|
||||
/// This is meant to be called by the db service, as this should only be done after related
|
||||
/// data is removed from the database, and checkpoints are updated.
|
||||
RemoveBlocksAbove((u64, oneshot::Sender<()>)),
|
||||
}
|
||||
|
||||
/// A handle to the static file service
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct StaticFileServiceHandle {
|
||||
/// The channel used to communicate with the static file service
|
||||
sender: Sender<StaticFileAction>,
|
||||
}
|
||||
|
||||
impl StaticFileServiceHandle {
|
||||
/// Create a new [`StaticFileServiceHandle`] from a [`Sender<StaticFileAction>`].
|
||||
pub const fn new(sender: Sender<StaticFileAction>) -> Self {
|
||||
Self { sender }
|
||||
}
|
||||
|
||||
/// Sends a specific [`StaticFileAction`] in the contained channel. The caller is responsible
|
||||
/// for creating any channels for the given action.
|
||||
pub fn send_action(&self, action: StaticFileAction) -> Result<(), SendError<StaticFileAction>> {
|
||||
self.sender.send(action)
|
||||
}
|
||||
}
|
||||
@@ -1357,7 +1357,7 @@ impl PersistenceState {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::static_files::StaticFileAction;
|
||||
use crate::persistence::PersistenceAction;
|
||||
use reth_beacon_consensus::EthBeaconConsensus;
|
||||
use reth_chain_state::{test_utils::get_executed_blocks, BlockState};
|
||||
use reth_chainspec::{ChainSpecBuilder, MAINNET};
|
||||
@@ -1372,7 +1372,7 @@ mod tests {
|
||||
tree: EngineApiTreeHandlerImpl<MockEthProvider, MockExecutorProvider, EthEngineTypes>,
|
||||
to_tree_tx: Sender<FromEngine<BeaconEngineMessage<EthEngineTypes>>>,
|
||||
blocks: Vec<ExecutedBlock>,
|
||||
sf_action_rx: Receiver<StaticFileAction>,
|
||||
action_rx: Receiver<PersistenceAction>,
|
||||
payload_command_rx: UnboundedReceiver<PayloadServiceCommand<EthEngineTypes>>,
|
||||
}
|
||||
|
||||
@@ -1395,8 +1395,7 @@ mod tests {
|
||||
let tree_state = TreeState { blocks_by_hash, blocks_by_number, ..Default::default() };
|
||||
|
||||
let (action_tx, action_rx) = channel();
|
||||
let (sf_action_tx, sf_action_rx) = channel();
|
||||
let persistence_handle = PersistenceHandle::new(action_tx, sf_action_tx);
|
||||
let persistence_handle = PersistenceHandle::new(action_tx);
|
||||
|
||||
let chain_spec = Arc::new(
|
||||
ChainSpecBuilder::default()
|
||||
@@ -1444,22 +1443,22 @@ mod tests {
|
||||
tree.canonical_in_memory_state =
|
||||
CanonicalInMemoryState::new(state_by_hash, hash_by_number, pending);
|
||||
|
||||
TestHarness { tree, to_tree_tx, blocks, sf_action_rx, payload_command_rx }
|
||||
TestHarness { tree, to_tree_tx, blocks, action_rx, payload_command_rx }
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_tree_persist_blocks() {
|
||||
// we need more than PERSISTENCE_THRESHOLD blocks to trigger the
|
||||
// persistence task.
|
||||
let TestHarness { tree, to_tree_tx, sf_action_rx, mut blocks, payload_command_rx } =
|
||||
let TestHarness { tree, to_tree_tx, action_rx, mut blocks, payload_command_rx } =
|
||||
get_default_test_harness(PERSISTENCE_THRESHOLD + 1);
|
||||
std::thread::Builder::new().name("Tree Task".to_string()).spawn(|| tree.run()).unwrap();
|
||||
|
||||
// send a message to the tree to enter the main loop.
|
||||
to_tree_tx.send(FromEngine::DownloadedBlocks(vec![])).unwrap();
|
||||
|
||||
let received_action = sf_action_rx.recv().expect("Failed to receive saved blocks");
|
||||
if let StaticFileAction::WriteExecutionData((saved_blocks, _)) = received_action {
|
||||
let received_action = action_rx.recv().expect("Failed to receive saved blocks");
|
||||
if let PersistenceAction::SaveBlocks((saved_blocks, _)) = received_action {
|
||||
// only PERSISTENCE_THRESHOLD will be persisted
|
||||
blocks.pop();
|
||||
assert_eq!(saved_blocks.len() as u64, PERSISTENCE_THRESHOLD);
|
||||
@@ -1471,7 +1470,7 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_in_memory_state_trait_impl() {
|
||||
let TestHarness { tree, to_tree_tx, sf_action_rx, blocks, payload_command_rx } =
|
||||
let TestHarness { tree, to_tree_tx, action_rx, blocks, payload_command_rx } =
|
||||
get_default_test_harness(10);
|
||||
|
||||
let head_block = blocks.last().unwrap().block();
|
||||
@@ -1494,7 +1493,7 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_engine_request_during_backfill() {
|
||||
let TestHarness { mut tree, to_tree_tx, sf_action_rx, blocks, payload_command_rx } =
|
||||
let TestHarness { mut tree, to_tree_tx, action_rx, blocks, payload_command_rx } =
|
||||
get_default_test_harness(PERSISTENCE_THRESHOLD);
|
||||
|
||||
// set backfill active
|
||||
|
||||
@@ -171,14 +171,28 @@ impl<'a, 'b, DB: Database> StorageWriter<'a, 'b, DB> {
|
||||
StorageType::StaticFile(self.static_file_writer())
|
||||
};
|
||||
|
||||
let mut last_tx_idx = None;
|
||||
for (idx, receipts) in blocks.enumerate() {
|
||||
let block_number = initial_block_number + idx as u64;
|
||||
|
||||
let first_tx_index = bodies_cursor
|
||||
.seek_exact(block_number)?
|
||||
.map(|(_, indices)| indices.first_tx_num())
|
||||
let mut first_tx_index =
|
||||
bodies_cursor.seek_exact(block_number)?.map(|(_, indices)| indices.first_tx_num());
|
||||
|
||||
// If there are no indices, that means there have been no transactions
|
||||
//
|
||||
// So instead of returning an error, use zero
|
||||
if block_number == initial_block_number && first_tx_index.is_none() {
|
||||
first_tx_index = Some(0);
|
||||
}
|
||||
|
||||
// TODO: I guess this error will never be returned
|
||||
let first_tx_index = first_tx_index
|
||||
.or(last_tx_idx)
|
||||
.ok_or_else(|| ProviderError::BlockBodyIndicesNotFound(block_number))?;
|
||||
|
||||
// update for empty blocks
|
||||
last_tx_idx = Some(first_tx_index);
|
||||
|
||||
match &mut storage_type {
|
||||
StorageType::Database(cursor) => {
|
||||
DatabaseWriter(cursor).append_block_receipts(
|
||||
|
||||
Reference in New Issue
Block a user