add transaction-blocks segment

This commit is contained in:
joshie
2025-11-07 14:18:43 +00:00
parent 177ad4c0b8
commit ebbe85ae15
12 changed files with 326 additions and 101 deletions

View File

@@ -2,7 +2,8 @@ use alloy_primitives::{hex, BlockHash};
use clap::Parser;
use reth_db::{
static_file::{
ColumnSelectorOne, ColumnSelectorTwo, HeaderWithHashMask, ReceiptMask, TransactionMask,
ColumnSelectorOne, ColumnSelectorTwo, HeaderWithHashMask, ReceiptMask,
TransactionBlocksMask, TransactionMask,
},
RawDupSort,
};
@@ -75,6 +76,10 @@ impl Command {
StaticFileSegment::Receipts => {
(table_key::<tables::Receipts>(&key)?, <ReceiptMask<ReceiptTy<N>>>::MASK)
}
StaticFileSegment::TransactionBlocks => (
table_key::<tables::TransactionBlocks>(&key)?,
<TransactionBlocksMask>::MASK,
),
};
let content = tool
@@ -114,6 +119,13 @@ impl Command {
)?;
println!("{}", serde_json::to_string_pretty(&receipt)?);
}
StaticFileSegment::TransactionBlocks => {
let block_number =
<<tables::TransactionBlocks as Table>::Value>::decompress(
content[0].as_slice(),
)?;
println!("{}", serde_json::to_string_pretty(&block_number)?);
}
}
}
}

View File

@@ -40,15 +40,17 @@ impl<C: ChainSpecParser> Command<C> {
let tool = DbTool::new(provider_factory)?;
let static_file_segment = match self.stage {
StageEnum::Headers => Some(StaticFileSegment::Headers),
StageEnum::Bodies => Some(StaticFileSegment::Transactions),
StageEnum::Execution => Some(StaticFileSegment::Receipts),
_ => None,
let static_file_segments = match self.stage {
StageEnum::Headers => vec![StaticFileSegment::Headers],
StageEnum::Bodies => {
vec![StaticFileSegment::Transactions, StaticFileSegment::TransactionBlocks]
}
StageEnum::Execution => vec![StaticFileSegment::Receipts],
_ => vec![],
};
// Delete static file segment data before inserting the genesis header below
if let Some(static_file_segment) = static_file_segment {
for static_file_segment in static_file_segments {
let static_file_provider = tool.provider_factory.static_file_provider();
let static_files = iter_static_files(static_file_provider.directory())?;
if let Some(segment_static_files) = static_files.get(&static_file_segment) {

View File

@@ -471,6 +471,7 @@ mod tests {
}
mod test_utils {
use reth_provider::{BlockReader, EitherWriter};
use crate::{
stages::bodies::BodyStage,
test_utils::{
@@ -584,7 +585,8 @@ mod tests {
if let Some(progress) = blocks.get(start as usize) {
// Insert last progress data
{
let tx = self.db.factory.provider_rw()?.into_tx();
let provider = self.db.factory.provider_rw()?;
let tx = provider.tx_ref();
let mut static_file_producer = static_file_provider
.get_writer(start, StaticFileSegment::Transactions)?;
@@ -600,11 +602,16 @@ mod tests {
static_file_producer.append_transaction(tx_num, &transaction).map(drop)
})?;
let mut tx_block_writer = EitherWriter::new_transaction_blocks(
&*provider,
&static_file_provider,
start,
)?;
tx_block_writer.increment_block(progress.number)?;
if body.tx_count != 0 {
tx.put::<tables::TransactionBlocks>(
body.last_tx_num(),
progress.number,
)?;
tx_block_writer
.append_transaction_block(body.last_tx_num(), &progress.number)?;
}
tx.put::<tables::BlockBodyIndices>(progress.number, body)?;
@@ -616,8 +623,9 @@ mod tests {
)?;
}
drop(tx_block_writer);
static_file_producer.commit()?;
tx.commit()?;
provider.commit()?;
}
}
self.set_responses(blocks.iter().map(body_by_hash).collect());
@@ -679,62 +687,62 @@ mod tests {
prev_progress: BlockNumber,
highest_block: BlockNumber,
) -> Result<(), TestRunnerError> {
let static_file_provider = self.db.factory.static_file_provider();
let provider = self.db.factory.provider()?;
let tx = provider.tx_ref();
self.db.query(|tx| {
// Acquire cursors on body related tables
let mut bodies_cursor = tx.cursor_read::<tables::BlockBodyIndices>()?;
let mut ommers_cursor = tx.cursor_read::<tables::BlockOmmers>()?;
let mut tx_block_cursor = tx.cursor_read::<tables::TransactionBlocks>()?;
// Acquire cursors on body related tables
let mut bodies_cursor = tx.cursor_read::<tables::BlockBodyIndices>()?;
let mut ommers_cursor = tx.cursor_read::<tables::BlockOmmers>()?;
let first_body_key = match bodies_cursor.first()? {
Some((key, _)) => key,
None => return Ok(()),
};
let first_body_key = match bodies_cursor.first()? {
Some((key, _)) => key,
None => return Ok(()),
};
let mut prev_number: Option<BlockNumber> = None;
let mut prev_number: Option<BlockNumber> = None;
for entry in bodies_cursor.walk(Some(first_body_key))? {
let (number, body) = entry?;
for entry in bodies_cursor.walk(Some(first_body_key))? {
let (number, body) = entry?;
// Validate sequentiality only after prev progress,
// since the data before is mocked and can contain gaps
if number > prev_progress
&& let Some(prev_key) = prev_number {
assert_eq!(prev_key + 1, number, "Body entries must be sequential");
}
// Validate that the current entry is below or equals to the highest allowed block
assert!(
number <= highest_block,
"We wrote a block body outside of our synced range. Found block with number {number}, highest block according to stage is {highest_block}",
);
let header = static_file_provider.header_by_number(number)?.expect("to be present");
// Validate that ommers exist if any
let stored_ommers = ommers_cursor.seek_exact(number)?;
if header.ommers_hash_is_empty() {
assert!(stored_ommers.is_none(), "Unexpected ommers entry");
} else {
assert!(stored_ommers.is_some(), "Missing ommers entry");
}
let tx_block_id = tx_block_cursor.seek_exact(body.last_tx_num())?.map(|(_,b)| b);
if body.tx_count == 0 {
assert_ne!(tx_block_id,Some(number));
} else {
assert_eq!(tx_block_id, Some(number));
}
for tx_id in body.tx_num_range() {
assert!(static_file_provider.transaction_by_id(tx_id)?.is_some(), "Transaction is missing.");
}
prev_number = Some(number);
// Validate sequentiality only after prev progress,
// since the data before is mocked and can contain gaps
if number > prev_progress
&& let Some(prev_key) = prev_number
{
assert_eq!(prev_key + 1, number, "Body entries must be sequential");
}
Ok(())
})?;
// Validate that the current entry is below or equals to the highest allowed block
assert!(
number <= highest_block,
"We wrote a block body outside of our synced range. Found block with number {number}, highest block according to stage is {highest_block}",
);
let header = provider.header_by_number(number)?.expect("to be present");
// Validate that ommers exist if any
let stored_ommers = ommers_cursor.seek_exact(number)?;
if header.ommers_hash_is_empty() {
assert!(stored_ommers.is_none(), "Unexpected ommers entry");
} else {
assert!(stored_ommers.is_some(), "Missing ommers entry");
}
let tx_block_id = provider.block_by_transaction_id(body.last_tx_num())?;
if body.tx_count == 0 {
assert_ne!(tx_block_id, Some(number));
} else {
assert_eq!(tx_block_id, Some(number));
}
for tx_id in body.tx_num_range() {
assert!(
provider.transaction_by_id(tx_id)?.is_some(),
"Transaction is missing."
);
}
prev_number = Some(number);
}
Ok(())
}
}

View File

@@ -340,7 +340,7 @@ mod tests {
};
use reth_ethereum_primitives::TransactionSigned;
use reth_primitives_traits::{SealedBlock, SealedHeader};
use reth_provider::{BlockNumReader, HeaderProvider, TransactionsProvider};
use reth_provider::{BlockNumReader, EitherWriter, HeaderProvider, TransactionsProvider};
use reth_testing_utils::generators::{
random_block_range, random_signed_tx, BlockRangeParams,
};
@@ -399,7 +399,8 @@ mod tests {
if let Some(progress) = blocks.get(start as usize) {
// Insert last progress data
{
let tx = self.db.factory.provider_rw()?.into_tx();
let provider = self.db.factory.provider_rw()?;
let tx = provider.tx_ref();
let mut static_file_producer = static_file_provider
.get_writer(start, StaticFileSegment::Transactions)?;
@@ -415,11 +416,16 @@ mod tests {
static_file_producer.append_transaction(tx_num, &transaction).map(drop)
})?;
let mut tx_block_writer = EitherWriter::new_transaction_blocks(
&*provider,
&static_file_provider,
start,
)?;
tx_block_writer.increment_block(progress.number)?;
if body.tx_count != 0 {
tx.put::<tables::TransactionBlocks>(
body.last_tx_num(),
progress.number,
)?;
tx_block_writer
.append_transaction_block(body.last_tx_num(), &progress.number)?;
}
tx.put::<tables::BlockBodyIndices>(progress.number, body)?;
@@ -431,8 +437,9 @@ mod tests {
)?;
}
drop(tx_block_writer);
static_file_producer.commit()?;
tx.commit()?;
provider.commit()?;
}
}
self.responses.replace(
@@ -526,10 +533,11 @@ mod tests {
let static_file_provider = self.db.factory.static_file_provider();
self.db.query(|tx| {
let provider = self.db.factory.provider()?;
// Acquire cursors on body related tables
let mut bodies_cursor = tx.cursor_read::<tables::BlockBodyIndices>()?;
let mut ommers_cursor = tx.cursor_read::<tables::BlockOmmers>()?;
let mut tx_block_cursor = tx.cursor_read::<tables::TransactionBlocks>()?;
let first_body_key = match bodies_cursor.first()? {
Some((key, _)) => key,
@@ -564,9 +572,9 @@ mod tests {
assert!(stored_ommers.is_some(), "Missing ommers entry");
}
let tx_block_id = tx_block_cursor.seek_exact(body.last_tx_num())?.map(|(_,b)| b);
let tx_block_id = provider.block_by_transaction_id(body.last_tx_num())?;
if body.tx_count == 0 {
assert_ne!(tx_block_id,Some(number));
assert_ne!(tx_block_id, Some(number));
} else {
assert_eq!(tx_block_id, Some(number));
}

View File

@@ -3,11 +3,7 @@ use reth_config::config::SenderRecoveryConfig;
use reth_consensus::ConsensusError;
use reth_db::static_file::TransactionMask;
use reth_db_api::{
cursor::DbCursorRW,
table::Value,
tables,
transaction::{DbTx, DbTxMut},
DbTxUnwindExt, RawValue,
cursor::DbCursorRW, table::Value, tables, transaction::DbTxMut, DbTxUnwindExt, RawValue,
};
use reth_primitives_traits::{GotExpected, NodePrimitives, SignedTransaction};
use reth_provider::{
@@ -148,7 +144,7 @@ fn recover_range<Provider, CURSOR>(
senders_cursor: &mut CURSOR,
) -> Result<(), StageError>
where
Provider: DBProvider + HeaderProvider + StaticFileProviderFactory,
Provider: DBProvider + HeaderProvider + StaticFileProviderFactory + BlockReader,
CURSOR: DbCursorRW<tables::TransactionSenders>,
{
debug!(target: "sync::stages::sender_recovery", ?tx_range, "Sending batch for processing");
@@ -181,8 +177,7 @@ where
SenderRecoveryStageError::FailedRecovery(err) => {
// get the block number for the bad transaction
let block_number = provider
.tx_ref()
.get::<tables::TransactionBlocks>(err.tx)?
.block_by_transaction_id(err.tx)?
.ok_or(ProviderError::BlockNumberForTransactionIndexNotFound)?;
// fetch the sealed header so we can use it in the sender recovery
@@ -374,7 +369,7 @@ mod tests {
};
use alloy_primitives::{BlockNumber, B256};
use assert_matches::assert_matches;
use reth_db_api::cursor::DbCursorRO;
use reth_db_api::{cursor::DbCursorRO, transaction::DbTx};
use reth_ethereum_primitives::{Block, TransactionSigned};
use reth_primitives_traits::{SealedBlock, SignerRecoverable};
use reth_provider::{

View File

@@ -35,6 +35,8 @@ pub enum StaticFileSegment {
Transactions,
/// Static File segment responsible for the `Receipts` table.
Receipts,
/// Static File segment responsible for the `TransactionBlocks` table.
TransactionBlocks,
}
impl StaticFileSegment {
@@ -46,13 +48,14 @@ impl StaticFileSegment {
Self::Headers => "headers",
Self::Transactions => "transactions",
Self::Receipts => "receipts",
Self::TransactionBlocks => "transaction-blocks",
}
}
/// Returns an iterator over all segments.
pub fn iter() -> impl Iterator<Item = Self> {
// The order of segments is significant and must be maintained to ensure correctness.
[Self::Headers, Self::Transactions, Self::Receipts].into_iter()
[Self::Headers, Self::Transactions, Self::Receipts, Self::TransactionBlocks].into_iter()
}
/// Returns the default configuration of the segment.
@@ -64,7 +67,7 @@ impl StaticFileSegment {
pub const fn columns(&self) -> usize {
match self {
Self::Headers => 3,
Self::Transactions | Self::Receipts => 1,
Self::Transactions | Self::Receipts | Self::TransactionBlocks => 1,
}
}
@@ -124,7 +127,7 @@ impl StaticFileSegment {
/// Returns `true` if a segment row is linked to a transaction.
pub const fn is_tx_based(&self) -> bool {
matches!(self, Self::Receipts | Self::Transactions)
matches!(self, Self::Receipts | Self::Transactions | Self::TransactionBlocks)
}
/// Returns `true` if a segment row is linked to a block.
@@ -484,6 +487,7 @@ mod tests {
StaticFileSegment::Headers => "headers",
StaticFileSegment::Transactions => "transactions",
StaticFileSegment::Receipts => "receipts",
StaticFileSegment::TransactionBlocks => "transaction-blocks",
};
assert_eq!(static_str, expected_str);
}
@@ -500,6 +504,7 @@ mod tests {
StaticFileSegment::Headers => "Headers",
StaticFileSegment::Transactions => "Transactions",
StaticFileSegment::Receipts => "Receipts",
StaticFileSegment::TransactionBlocks => "TransactionBlocks",
};
assert_eq!(ser, format!("\"{expected_str}\""));
}

View File

@@ -8,6 +8,7 @@ use serde::{Deserialize, Serialize};
/// These should be set during `init_genesis` or `init_db` depending on whether we want dictate
/// behaviour of new or old nodes respectively.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize, Compact)]
#[serde(default)]
#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
#[add_arbitrary_tests(compact)]
pub struct StorageSettings {
@@ -15,12 +16,14 @@ pub struct StorageSettings {
///
/// If this is set to FALSE AND receipt pruning IS ENABLED, all receipts should be written to DB. Otherwise, they should be written to static files. This ensures that older nodes do not need to migrate their current DB tables to static files. For more, read: <https://github.com/paradigmxyz/reth/issues/18890#issuecomment-3457760097>
pub receipts_in_static_files: bool,
/// Whether this node always writes transaction blocks to static files.
pub transaction_blocks_in_static_files: bool,
}
impl StorageSettings {
/// Creates a new `StorageSettings` with default values.
pub const fn new() -> Self {
Self { receipts_in_static_files: false }
Self { receipts_in_static_files: false, transaction_blocks_in_static_files: false }
}
/// Creates `StorageSettings` for legacy nodes.
@@ -28,12 +31,18 @@ impl StorageSettings {
/// This explicitly sets `receipts_in_static_files` to `false`, ensuring older nodes
/// continue writing receipts to the database when receipt pruning is enabled.
pub const fn legacy() -> Self {
Self { receipts_in_static_files: false }
Self { receipts_in_static_files: false, transaction_blocks_in_static_files: false }
}
/// Sets the `receipts_static_files` flag to true.
/// Sets the `receipts_in_static_files` flag to true.
pub const fn with_receipts_in_static_files(mut self) -> Self {
self.receipts_in_static_files = true;
self
}
/// Sets the `transaction_blocks_in_static_files` flag to true.
pub const fn with_transaction_blocks_in_static_files(mut self) -> Self {
self.transaction_blocks_in_static_files = true;
self
}
}

View File

@@ -42,3 +42,9 @@ add_static_file_mask! {
#[doc = "Mask for selecting a single transaction from Transactions static file segment"]
TransactionMask<T>, T, 0b1
}
// TRANSACTION BLOCKS MASKS
add_static_file_mask! {
#[doc = "Mask for selecting a single transaction block from `TransactionBlocks` static file segment"]
TransactionBlocksMask, alloy_primitives::BlockNumber, 0b1
}

View File

@@ -1,13 +1,23 @@
//! Generic writer abstraction for writing to either database tables or static files.
use crate::providers::StaticFileProviderRWRefMut;
use crate::{
providers::{StaticFileProvider, StaticFileProviderRWRefMut},
DBProvider, StaticFileWriter,
};
use alloy_primitives::{BlockNumber, TxNumber};
use reth_db::table::Value;
use reth_db_api::{cursor::DbCursorRW, tables};
use reth_db_api::{
cursor::{DbCursorRO, DbCursorRW},
tables,
transaction::DbTxMut,
};
use reth_node_types::NodePrimitives;
use reth_static_file_types::StaticFileSegment;
use reth_storage_api::StorageSettingsCache;
use reth_storage_errors::provider::ProviderResult;
use std::ops::RangeBounds;
/// Represents a destination for writing data, either to database or static files.
/// Represents a destination for writing/pruning data, either to database or static files.
#[derive(Debug)]
pub enum EitherWriter<'a, CURSOR, N> {
/// Write to database table via cursor
@@ -28,6 +38,31 @@ impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N> {
}
}
impl<'a, N: NodePrimitives> EitherWriter<'a, (), N> {
/// Creates a new `EitherWriter` for `TransactionBlocks` based on storage settings.
pub fn new_transaction_blocks<P>(
provider: &'a P,
static_file_provider: &'a StaticFileProvider<N>,
block_number: BlockNumber,
) -> ProviderResult<EitherWriter<'a, <P::Tx as DbTxMut>::CursorMut<tables::TransactionBlocks>, N>>
where
P: DBProvider + StorageSettingsCache,
P::Tx: DbTxMut,
{
let storage_settings = provider.cached_storage_settings();
if storage_settings.transaction_blocks_in_static_files {
Ok(EitherWriter::StaticFile(
static_file_provider
.get_writer(block_number, StaticFileSegment::TransactionBlocks)?,
))
} else {
Ok(EitherWriter::Database(
provider.tx_ref().cursor_write::<tables::TransactionBlocks>()?,
))
}
}
}
impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
where
N::Receipt: Value,
@@ -41,3 +76,52 @@ where
}
}
}
impl<'a, CURSOR, N: NodePrimitives> EitherWriter<'a, CURSOR, N>
where
CURSOR: DbCursorRW<tables::TransactionBlocks> + DbCursorRO<tables::TransactionBlocks>,
{
/// Append a transaction block mapping to the destination
pub fn append_transaction_block(
&mut self,
tx_num: TxNumber,
block_number: &BlockNumber,
) -> ProviderResult<()> {
match self {
Self::Database(cursor) => Ok(cursor.append(tx_num, block_number)?),
Self::StaticFile(writer) => writer.append_transaction_block(tx_num, block_number),
}
}
/// Prune transaction blocks from the destination
pub fn prune_transaction_blocks(
self,
range: impl RangeBounds<TxNumber>,
last_block: BlockNumber,
) -> ProviderResult<()> {
match self {
Self::Database(mut cursor) => {
// Remove entries from the database table
let mut walker = cursor.walk_range(range)?;
while walker.next().transpose()?.is_some() {
walker.delete_current()?;
}
Ok(())
}
Self::StaticFile(mut writer) => {
// Calculate how many entries to delete from static files
let highest_static_file_tx =
writer.user_header().tx_range().map(|range| range.end()).unwrap_or_default();
let start = match range.start_bound() {
std::ops::Bound::Included(&n) => n,
std::ops::Bound::Excluded(&n) => n + 1,
std::ops::Bound::Unbounded => 0,
};
let to_delete = (highest_static_file_tx + 1).saturating_sub(start);
writer.prune_transaction_blocks(to_delete, last_block)
}
}
}
}

View File

@@ -1183,11 +1183,15 @@ impl<TX: DbTx + 'static, N: NodeTypesForProvider> BlockReader for DatabaseProvid
}
fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
Ok(self
.tx
.cursor_read::<tables::TransactionBlocks>()?
.seek(id)
.map(|b| b.map(|(_, bn)| bn))?)
if self.storage_settings.read().transaction_blocks_in_static_files {
self.static_file_provider.block_by_transaction_id(id)
} else {
Ok(self
.tx
.cursor_read::<tables::TransactionBlocks>()?
.seek(id)
.map(|b| b.map(|(_, bn)| bn))?)
}
}
}
@@ -2943,7 +2947,10 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockWrite
.ok_or(ProviderError::BlockBodyIndicesNotFound(block))?;
self.remove::<tables::BlockBodyIndices>(block + 1..)?;
self.remove::<tables::TransactionBlocks>(unwind_tx_from..)?;
// Handle TransactionBlocks pruning based on storage settings
EitherWriter::new_transaction_blocks(self, &self.static_file_provider, block)?
.prune_transaction_blocks(unwind_tx_from.., block)?;
let static_file_tx_num =
self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Transactions);

View File

@@ -21,7 +21,7 @@ use reth_db::{
lockfile::StorageLock,
static_file::{
iter_static_files, BlockHashMask, HeaderMask, HeaderWithHashMask, ReceiptMask,
StaticFileCursor, TransactionMask,
StaticFileCursor, TransactionBlocksMask, TransactionMask,
},
};
use reth_db_api::{
@@ -967,7 +967,9 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
for segment in StaticFileSegment::iter() {
match segment {
StaticFileSegment::Headers | StaticFileSegment::Transactions => {}
StaticFileSegment::Headers |
StaticFileSegment::Transactions |
StaticFileSegment::TransactionBlocks => {}
StaticFileSegment::Receipts => {
if has_receipt_pruning {
// Pruned nodes (including full node) do not store receipts as static files.
@@ -1076,6 +1078,13 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
highest_tx,
highest_block,
)?,
StaticFileSegment::TransactionBlocks => self
.ensure_invariants::<_, tables::TransactionBlocks>(
provider,
segment,
highest_tx,
highest_block,
)?,
} {
update_unwind_target(unwind);
}
@@ -1163,6 +1172,7 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
StaticFileSegment::Headers => StageId::Headers,
StaticFileSegment::Transactions => StageId::Bodies,
StaticFileSegment::Receipts => StageId::Execution,
StaticFileSegment::TransactionBlocks => StageId::Bodies,
})?
.unwrap_or_default()
.block_number;
@@ -1976,8 +1986,16 @@ impl<N: NodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>> Blo
Err(ProviderError::UnsupportedProvider)
}
fn block_by_transaction_id(&self, _id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
Err(ProviderError::UnsupportedProvider)
fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
self.get_segment_provider_for_transaction(StaticFileSegment::TransactionBlocks, id, None)
.and_then(|provider| provider.cursor()?.get_one::<TransactionBlocksMask>(id.into()))
.or_else(|err| {
if let ProviderError::MissingStaticFileTx(_, _) = err {
Ok(None)
} else {
Err(err)
}
})
}
}

View File

@@ -29,6 +29,7 @@ pub(crate) struct StaticFileWriters<N> {
headers: RwLock<Option<StaticFileProviderRW<N>>>,
transactions: RwLock<Option<StaticFileProviderRW<N>>>,
receipts: RwLock<Option<StaticFileProviderRW<N>>>,
transaction_blocks: RwLock<Option<StaticFileProviderRW<N>>>,
}
impl<N> Default for StaticFileWriters<N> {
@@ -37,6 +38,7 @@ impl<N> Default for StaticFileWriters<N> {
headers: Default::default(),
transactions: Default::default(),
receipts: Default::default(),
transaction_blocks: Default::default(),
}
}
}
@@ -51,6 +53,7 @@ impl<N: NodePrimitives> StaticFileWriters<N> {
StaticFileSegment::Headers => self.headers.write(),
StaticFileSegment::Transactions => self.transactions.write(),
StaticFileSegment::Receipts => self.receipts.write(),
StaticFileSegment::TransactionBlocks => self.transaction_blocks.write(),
};
if write_guard.is_none() {
@@ -244,6 +247,10 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
StaticFileSegment::Receipts => {
self.prune_receipt_data(to_delete, last_block_number.expect("should exist"))?
}
StaticFileSegment::TransactionBlocks => self.prune_transaction_block_data(
to_delete,
last_block_number.expect("should exist"),
)?,
}
}
@@ -672,6 +679,35 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
Ok(Some(tx_number))
}
/// Appends transaction block to static file.
///
/// It **DOES NOT** call `increment_block()`, it should be handled elsewhere. There might be
/// empty blocks and this function wouldn't be called.
///
/// Returns the current [`TxNumber`] as seen in the static file.
pub fn append_transaction_block(
&mut self,
tx_num: TxNumber,
block_number: &BlockNumber,
) -> ProviderResult<()> {
let start = Instant::now();
self.ensure_no_queued_prune()?;
debug_assert!(self.writer.user_header().segment() == StaticFileSegment::TransactionBlocks);
self.append_with_tx_number(tx_num, block_number)?;
if let Some(metrics) = &self.metrics {
metrics.record_segment_operations(
StaticFileSegment::TransactionBlocks,
StaticFileProviderOperation::Append,
1,
Some(start.elapsed()),
);
}
Ok(())
}
/// Adds an instruction to prune `to_delete` transactions during commit.
///
/// Note: `last_block` refers to the block the unwinds ends at.
@@ -702,6 +738,18 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
self.queue_prune(to_delete, None)
}
/// Adds an instruction to prune `to_delete` transaction blocks during commit.
///
/// Note: `last_block` refers to the block the unwinds ends at.
pub fn prune_transaction_blocks(
&mut self,
to_delete: u64,
last_block: BlockNumber,
) -> ProviderResult<()> {
debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::TransactionBlocks);
self.queue_prune(to_delete, Some(last_block))
}
/// Adds an instruction to prune `to_delete` elements during commit.
///
/// Note: `last_block` refers to the block the unwinds ends at if dealing with transaction-based
@@ -772,6 +820,29 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
Ok(())
}
/// Prunes the last `to_delete` transaction blocks from the data file.
fn prune_transaction_block_data(
&mut self,
to_delete: u64,
last_block: BlockNumber,
) -> ProviderResult<()> {
let start = Instant::now();
debug_assert!(self.writer.user_header().segment() == StaticFileSegment::TransactionBlocks);
self.truncate(to_delete, Some(last_block))?;
if let Some(metrics) = &self.metrics {
metrics.record_segment_operation(
StaticFileSegment::TransactionBlocks,
StaticFileProviderOperation::Prune,
Some(start.elapsed()),
);
}
Ok(())
}
/// Prunes the last `to_delete` headers from the data file.
fn prune_header_data(&mut self, to_delete: u64) -> ProviderResult<()> {
let start = Instant::now();