Compare commits

...

18 Commits

Author SHA1 Message Date
Georgios Konstantopoulos
3c0b81dc71 fix(static-file): use with_len for ChangesetOffsetReader to respect committed length
- truncate_changesets: use with_len instead of new() to respect header's
  committed length, not raw file length (prevents reading uncommitted records)
- heal_changeset_sidecar: use with_len for consistency
- heal_changeset_sidecar: handle edge case where actual_rows=0 but
  committed_len>0 (all offsets invalid, truncate to 0)

Amp-Thread-ID: https://ampcode.com/threads/T-019c2569-ce48-7228-96ac-4f4a8db82a2a
Co-authored-by: Amp <amp@ampcode.com>
2026-02-03 22:49:14 +00:00
Georgios Konstantopoulos
04cf37ad4a fix(static-file): validate sidecar file size in ChangesetOffsetReader::with_len
If the header claims more records than the sidecar file contains,
return InvalidData error instead of allowing reads past EOF.

This prevents UnexpectedEof panics when the sidecar is corrupted
or truncated.

Amp-Thread-ID: https://ampcode.com/threads/T-019c2569-ce48-7228-96ac-4f4a8db82a2a
Co-authored-by: Amp <amp@ampcode.com>
2026-02-03 22:44:48 +00:00
Georgios Konstantopoulos
9edc72e278 fix(static-file): heal changeset sidecar after NippyJar healing
Move sidecar creation AFTER ensure_end_range_consistency() so that
if NippyJar heals and reduces rows, we validate sidecar offsets against
the new row count.

The new heal_changeset_sidecar() method:
1. Opens sidecar with committed_len from post-heal header
2. Checks if last offset points past actual rows
3. If so, finds the last valid block and truncates sidecar + header
4. Commits the healed state

This fixes a crash scenario where:
1. Sidecar + header committed with offsets pointing to rows 0-100
2. Crash, then NippyJar heals and truncates data to 80 rows
3. On open, sidecar offsets 80-100 now point past EOF
4. Read would fail with out-of-bounds error

Amp-Thread-ID: https://ampcode.com/threads/T-019c252f-6e40-728b-87a1-bbb4f0885215
Co-authored-by: Amp <amp@ampcode.com>
2026-02-03 21:27:17 +00:00
Georgios Konstantopoulos
20a182abc3 fix(static-file): add sync after sidecar truncate for crash safety
All set_len() calls on the sidecar file must be followed by sync_all()
for crash safety. Without sync, a crash could resurrect the old file
length.

Fixed in:
- ChangesetOffsetWriter::new() - sync after partial record truncation
- ChangesetOffsetWriter::new() - sync after uncommitted record truncation
- ChangesetOffsetWriter::truncate() - sync after prune truncation

Amp-Thread-ID: https://ampcode.com/threads/T-019c252f-6e40-728b-87a1-bbb4f0885215
Co-authored-by: Amp <amp@ampcode.com>
2026-02-03 21:11:14 +00:00
Georgios Konstantopoulos
1f951685cb refactor(static-file): move sidecar healing into ChangesetOffsetWriter::new
Move the truncate-to-committed-length logic from StaticFileProviderRW::new()
into ChangesetOffsetWriter::new() by adding a committed_len parameter.

ChangesetOffsetWriter::new() now:
1. Truncates partial records (crash mid-write)
2. Truncates extra complete records to match committed_len (crash after
   sidecar sync but before header commit)
3. Errors if sidecar has fewer records than committed (data corruption)

This mirrors NippyJar's healing behavior where config/header is the commit
boundary.

Amp-Thread-ID: https://ampcode.com/threads/T-019c252f-6e40-728b-87a1-bbb4f0885215
Co-authored-by: Amp <amp@ampcode.com>
2026-02-03 21:01:45 +00:00
Georgios Konstantopoulos
33e695349e fix(static-file): add 'Unlikely' to sidecar-shorter-than-header comment
Amp-Thread-ID: https://ampcode.com/threads/T-019c252f-6e40-728b-87a1-bbb4f0885215
Co-authored-by: Amp <amp@ampcode.com>
2026-02-03 20:58:12 +00:00
Georgios Konstantopoulos
5e2b06ba0c fix(static-file): truncate changeset offset sidecar to committed length on open
On writer open, heal the .csoff sidecar file to match the committed
header length. This fixes a crash safety bug where:

1. Sidecar is synced with N records
2. Crash before header commits (header still says len=N-1)
3. Restart: writer sees N records, appends at N+1
4. Result: permanent index shift corrupting future reads

Now the sidecar is truncated to the committed header length on open,
mirroring how NippyJar heals its offset/data files to match config.

Also adds test for this crash recovery scenario.

Amp-Thread-ID: https://ampcode.com/threads/T-019c252f-6e40-728b-87a1-bbb4f0885215
Co-authored-by: Amp <amp@ampcode.com>
2026-02-03 20:55:35 +00:00
Georgios Konstantopoulos
f0a9ac0ff6 chore: restore deleted code comment in binary search
The comment explains unexpected behavior when out of range during
binary search for account changeset fetch.

Amp-Thread-ID: https://ampcode.com/threads/T-019c251a-2593-726b-b374-a63fdc188b9f
Co-authored-by: Amp <amp@ampcode.com>
2026-02-03 20:04:19 +00:00
Dan Cline
3ad2425bd2 fix: tracing optional 2026-01-30 15:44:49 -05:00
Dan Cline
c263eac79a fix: feature propagation 2026-01-30 15:44:49 -05:00
Dan Cline
8ffcde8d1a fix: sync_all properly 2026-01-30 15:44:49 -05:00
Dan Cline
262149069a chore: fix tests etc 2026-01-30 15:44:49 -05:00
Georgios Konstantopoulos
4a5d43dc46 chore: fix clippy collapsible_if warning
Amp-Thread-ID: https://ampcode.com/threads/T-019c0be2-7d6c-75cf-b39e-aa4cbc782d51
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 15:44:49 -05:00
Georgios Konstantopoulos
17d3e80953 fix(static-file): avoid double sync_all in commit() that corrupts offsets
The original code called self.sync_all() before self.writer.commit(), but
commit() internally calls sync_all() again. This caused commit_offsets_inner()
to be called twice, which corrupted the offset file due to BufWriter position
desync when reading via get_ref().read_exact().

Now we only sync the changeset sidecar file in commit(), and let the main
writer be synced by its own commit() call.

Fixes test failures in stages::bodies::tests::{execute_body, full_body_download,
partial_body_download, sync_from_previous_progress}

Amp-Thread-ID: https://ampcode.com/threads/T-019c0be2-7d6c-75cf-b39e-aa4cbc782d51
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 15:44:49 -05:00
Georgios Konstantopoulos
5282231574 chore: fix trailing newline in Cargo.toml
Amp-Thread-ID: https://ampcode.com/threads/T-019c0bae-30ce-719b-890b-a06ba81aae40
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 15:44:49 -05:00
Georgios Konstantopoulos
ba9bc425d9 refactor: move sidecar sync to sync_all(), fix trie-sparse clippy
- Move changeset offsets sidecar sync logic from commit() to sync_all()
- commit() now calls sync_all() before writing config
- Fix dead_code warning for PARALLEL_PRUNE_THRESHOLD (cfg-gate to std)
- Fix needless_return in is_prune_parallelism_enabled

Co-authored-by: Amp <amp@ampcode.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019c0b47-4de3-76b9-b0bd-54b8696cb9e2
2026-01-30 15:44:49 -05:00
Georgios Konstantopoulos
4e2fa3db44 chore: trigger CI
Amp-Thread-ID: https://ampcode.com/threads/T-019c0b47-4de3-76b9-b0bd-54b8696cb9e2
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 15:36:31 -05:00
Georgios Konstantopoulos
34811b908c feat(static-file): incremental changeset offset storage
Replace inline Vec<ChangesetOffset> in SegmentHeader with a separate
.csoff sidecar file for incremental append/prune operations.

## Problem

Previously, changeset offsets were stored as Vec<ChangesetOffset> in
SegmentHeader and fully rewritten on every commit. For segments with
500k+ blocks, this meant ~8MB rewritten per commit even when appending
a single block.

## Solution

- Store offsets in a separate .csoff sidecar file (16-byte fixed records)
- SegmentHeader now stores only changeset_offsets_len: u64 (count)
- Append: O(total_blocks) -> O(1) (16 bytes per block)
- Prune: O(remaining_blocks) -> O(1) (truncate sidecar + update len)
- Commit overhead: ~8MB for 500k blocks -> ~100 bytes (header only)

## Breaking Change

This changes the static file format for changeset segments. Existing
changeset static files are not backwards compatible and must be
regenerated.

Amp-Thread-ID: https://ampcode.com/threads/T-019c0b47-4de3-76b9-b0bd-54b8696cb9e2
Co-authored-by: Amp <amp@ampcode.com>
2026-01-30 15:36:31 -05:00
14 changed files with 938 additions and 142 deletions

2
Cargo.lock generated
View File

@@ -10900,6 +10900,8 @@ dependencies = [
"serde",
"serde_json",
"strum 0.27.2",
"tempfile",
"tracing",
]
[[package]]

View File

@@ -67,6 +67,7 @@ tempfile.workspace = true
[features]
default = []
edge = ["reth-provider/edge"]
serde = [
"reth-exex-types/serde",
"reth-revm/serde",

View File

@@ -38,3 +38,4 @@ assert_matches.workspace = true
tempfile.workspace = true
[features]
edge = ["reth-stages/edge"]

View File

@@ -13,6 +13,7 @@ workspace = true
[dependencies]
alloy-primitives.workspace = true
tracing = { workspace = true, optional = true }
clap = { workspace = true, features = ["derive"], optional = true }
fixed-map.workspace = true
@@ -24,6 +25,7 @@ strum = { workspace = true, features = ["derive"] }
reth-nippy-jar.workspace = true
serde_json.workspace = true
insta.workspace = true
tempfile.workspace = true
[features]
default = ["std"]
@@ -34,5 +36,7 @@ std = [
"strum/std",
"serde_json/std",
"fixed-map/std",
"dep:tracing",
"tracing?/std",
]
clap = ["dep:clap"]

View File

@@ -0,0 +1,471 @@
//! Changeset offset sidecar file I/O.
//!
//! Provides append-only writing and O(1) random-access reading for changeset offsets.
//! The file format is fixed-width 16-byte records: `[offset: u64 LE][num_changes: u64 LE]`.
use crate::ChangesetOffset;
use std::{
fs::{File, OpenOptions},
io::{self, Read, Seek, SeekFrom, Write},
path::Path,
};
/// Writer for appending changeset offsets to a sidecar file.
#[derive(Debug)]
pub struct ChangesetOffsetWriter {
file: File,
/// Number of records written.
records_written: u64,
}
impl ChangesetOffsetWriter {
/// Record size in bytes.
const RECORD_SIZE: usize = 16;
/// Opens or creates the changeset offset file for appending.
///
/// The file is healed to match `committed_len` (from the segment header):
/// - Partial records (from crash mid-write) are truncated to record boundary
/// - Extra complete records (from crash after sidecar sync but before header commit)
/// are truncated to match the committed length
/// - If the file has fewer records than committed, returns an error (data corruption)
///
/// This mirrors NippyJar's healing behavior where config/header is the commit boundary.
pub fn new(path: impl AsRef<Path>, committed_len: u64) -> io::Result<Self> {
let file = OpenOptions::new()
.create(true)
.truncate(false)
.read(true)
.write(true)
.open(path.as_ref())?;
let file_len = file.metadata()?.len();
let remainder = file_len % Self::RECORD_SIZE as u64;
// First, truncate any partial record from crash mid-write
let aligned_len = if remainder != 0 {
let truncated_len = file_len - remainder;
#[cfg(feature = "std")]
tracing::warn!(
target: "reth::static_file",
path = %path.as_ref().display(),
original_len = file_len,
truncated_len,
"Truncating partial changeset offset record"
);
file.set_len(truncated_len)?;
file.sync_all()?; // Sync required for crash safety
truncated_len
} else {
file_len
};
let records_in_file = aligned_len / Self::RECORD_SIZE as u64;
// Heal sidecar to match committed header length
match records_in_file.cmp(&committed_len) {
std::cmp::Ordering::Greater => {
// Sidecar has uncommitted records from a crash - truncate them
let target_len = committed_len * Self::RECORD_SIZE as u64;
#[cfg(feature = "std")]
tracing::warn!(
target: "reth::static_file",
path = %path.as_ref().display(),
sidecar_records = records_in_file,
committed_len,
"Truncating uncommitted changeset offset records after crash recovery"
);
file.set_len(target_len)?;
file.sync_all()?; // Sync required for crash safety
}
std::cmp::Ordering::Less => {
// Unlikely: sidecar is shorter than header claims - data corruption or
// incomplete prune
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"Changeset offset sidecar has {} records but header expects {}: {}",
records_in_file,
committed_len,
path.as_ref().display()
),
));
}
std::cmp::Ordering::Equal => {}
}
let records_written = committed_len;
let file = OpenOptions::new().create(true).append(true).open(path)?;
Ok(Self { file, records_written })
}
/// Appends a single changeset offset record.
pub fn append(&mut self, offset: &ChangesetOffset) -> io::Result<()> {
let mut buf = [0u8; Self::RECORD_SIZE];
buf[..8].copy_from_slice(&offset.offset().to_le_bytes());
buf[8..].copy_from_slice(&offset.num_changes().to_le_bytes());
self.file.write_all(&buf)?;
self.records_written += 1;
Ok(())
}
/// Appends multiple changeset offset records.
pub fn append_many(&mut self, offsets: &[ChangesetOffset]) -> io::Result<()> {
for offset in offsets {
self.append(offset)?;
}
Ok(())
}
/// Syncs all data to disk. Must be called before committing the header.
pub fn sync(&mut self) -> io::Result<()> {
self.file.sync_all()
}
/// Truncates the file to contain exactly `len` records and syncs to disk.
/// Used after prune operations to reclaim space.
///
/// The sync is required for crash safety - without it, a crash could
/// resurrect the old file length.
pub fn truncate(&mut self, len: u64) -> io::Result<()> {
self.file.set_len(len * Self::RECORD_SIZE as u64)?;
self.file.sync_all()?;
self.records_written = len;
Ok(())
}
/// Returns the number of records in the file.
pub const fn len(&self) -> u64 {
self.records_written
}
/// Returns true if the file is empty.
pub const fn is_empty(&self) -> bool {
self.records_written == 0
}
}
/// Reader for changeset offsets with O(1) random access.
#[derive(Debug)]
pub struct ChangesetOffsetReader {
file: File,
/// Cached file length in records.
len: u64,
}
impl ChangesetOffsetReader {
/// Record size in bytes.
const RECORD_SIZE: usize = 16;
/// Opens the changeset offset file for reading.
pub fn new(path: impl AsRef<Path>) -> io::Result<Self> {
let file = File::open(path)?;
let len = file.metadata()?.len() / Self::RECORD_SIZE as u64;
Ok(Self { file, len })
}
/// Opens with an explicit length (from header metadata).
/// Any records beyond `len` are ignored.
///
/// Returns an error if the file has fewer records than `len` (data corruption).
pub fn with_len(path: impl AsRef<Path>, len: u64) -> io::Result<Self> {
let file = File::open(&path)?;
let file_len = file.metadata()?.len();
let records_in_file = file_len / Self::RECORD_SIZE as u64;
if records_in_file < len {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"Changeset offset sidecar has {} records but expected at least {}: {}",
records_in_file,
len,
path.as_ref().display()
),
));
}
Ok(Self { file, len })
}
/// Reads a single changeset offset by block index.
/// Returns None if index is out of bounds.
pub fn get(&mut self, block_index: u64) -> io::Result<Option<ChangesetOffset>> {
if block_index >= self.len {
return Ok(None);
}
let byte_pos = block_index * Self::RECORD_SIZE as u64;
self.file.seek(SeekFrom::Start(byte_pos))?;
let mut buf = [0u8; Self::RECORD_SIZE];
self.file.read_exact(&mut buf)?;
let offset = u64::from_le_bytes(buf[..8].try_into().unwrap());
let num_changes = u64::from_le_bytes(buf[8..].try_into().unwrap());
Ok(Some(ChangesetOffset::new(offset, num_changes)))
}
/// Reads a range of changeset offsets.
pub fn get_range(&mut self, start: u64, end: u64) -> io::Result<Vec<ChangesetOffset>> {
let end = end.min(self.len);
if start >= end {
return Ok(Vec::new());
}
let count = (end - start) as usize;
let byte_pos = start * Self::RECORD_SIZE as u64;
self.file.seek(SeekFrom::Start(byte_pos))?;
let mut result = Vec::with_capacity(count);
let mut buf = [0u8; Self::RECORD_SIZE];
for _ in 0..count {
self.file.read_exact(&mut buf)?;
let offset = u64::from_le_bytes(buf[..8].try_into().unwrap());
let num_changes = u64::from_le_bytes(buf[8..].try_into().unwrap());
result.push(ChangesetOffset::new(offset, num_changes));
}
Ok(result)
}
/// Returns the number of valid records.
pub const fn len(&self) -> u64 {
self.len
}
/// Returns true if there are no records.
pub const fn is_empty(&self) -> bool {
self.len == 0
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[test]
fn test_write_and_read() {
let dir = tempdir().unwrap();
let path = dir.path().join("test.csoff");
// Write (new file, committed_len=0)
{
let mut writer = ChangesetOffsetWriter::new(&path, 0).unwrap();
writer.append(&ChangesetOffset::new(0, 5)).unwrap();
writer.append(&ChangesetOffset::new(5, 3)).unwrap();
writer.append(&ChangesetOffset::new(8, 10)).unwrap();
writer.sync().unwrap();
assert_eq!(writer.len(), 3);
}
// Read
{
let mut reader = ChangesetOffsetReader::new(&path).unwrap();
assert_eq!(reader.len(), 3);
let entry = reader.get(0).unwrap().unwrap();
assert_eq!(entry.offset(), 0);
assert_eq!(entry.num_changes(), 5);
let entry = reader.get(1).unwrap().unwrap();
assert_eq!(entry.offset(), 5);
assert_eq!(entry.num_changes(), 3);
let entry = reader.get(2).unwrap().unwrap();
assert_eq!(entry.offset(), 8);
assert_eq!(entry.num_changes(), 10);
assert!(reader.get(3).unwrap().is_none());
}
}
#[test]
fn test_truncate() {
let dir = tempdir().unwrap();
let path = dir.path().join("test.csoff");
let mut writer = ChangesetOffsetWriter::new(&path, 0).unwrap();
writer.append(&ChangesetOffset::new(0, 1)).unwrap();
writer.append(&ChangesetOffset::new(1, 2)).unwrap();
writer.append(&ChangesetOffset::new(3, 3)).unwrap();
writer.sync().unwrap();
writer.truncate(2).unwrap();
assert_eq!(writer.len(), 2);
let mut reader = ChangesetOffsetReader::new(&path).unwrap();
assert_eq!(reader.len(), 2);
assert!(reader.get(2).unwrap().is_none());
}
#[test]
fn test_partial_record_recovery() {
let dir = tempdir().unwrap();
let path = dir.path().join("test.csoff");
// Write 1 full record (16 bytes) + 8 trailing bytes (partial record)
{
let mut file = std::fs::File::create(&path).unwrap();
// Full record: offset=100, num_changes=5
file.write_all(&100u64.to_le_bytes()).unwrap();
file.write_all(&5u64.to_le_bytes()).unwrap();
// Partial record: only 8 bytes (incomplete)
file.write_all(&200u64.to_le_bytes()).unwrap();
file.sync_all().unwrap();
}
// Verify file has 24 bytes before opening with writer
assert_eq!(std::fs::metadata(&path).unwrap().len(), 24);
// Open with writer, committed_len=1 (header committed 1 record)
// Should truncate partial record and match committed length
let writer = ChangesetOffsetWriter::new(&path, 1).unwrap();
assert_eq!(writer.len(), 1);
// Verify file was truncated to 16 bytes
assert_eq!(std::fs::metadata(&path).unwrap().len(), 16);
// Verify the complete record is readable
let mut reader = ChangesetOffsetReader::new(&path).unwrap();
assert_eq!(reader.len(), 1);
let entry = reader.get(0).unwrap().unwrap();
assert_eq!(entry.offset(), 100);
assert_eq!(entry.num_changes(), 5);
}
#[test]
fn test_with_len_bounds_reads() {
let dir = tempdir().unwrap();
let path = dir.path().join("test.csoff");
// Write 3 records
{
let mut writer = ChangesetOffsetWriter::new(&path, 0).unwrap();
writer.append(&ChangesetOffset::new(0, 10)).unwrap();
writer.append(&ChangesetOffset::new(10, 20)).unwrap();
writer.append(&ChangesetOffset::new(30, 30)).unwrap();
writer.sync().unwrap();
assert_eq!(writer.len(), 3);
}
// Open with explicit len=2, ignoring the 3rd record
let mut reader = ChangesetOffsetReader::with_len(&path, 2).unwrap();
assert_eq!(reader.len(), 2);
// First two records should be readable
let entry0 = reader.get(0).unwrap().unwrap();
assert_eq!(entry0.offset(), 0);
assert_eq!(entry0.num_changes(), 10);
let entry1 = reader.get(1).unwrap().unwrap();
assert_eq!(entry1.offset(), 10);
assert_eq!(entry1.num_changes(), 20);
// Third record should be out of bounds (due to len=2)
assert!(reader.get(2).unwrap().is_none());
// get_range should also respect the len bound
let range = reader.get_range(0, 5).unwrap();
assert_eq!(range.len(), 2);
}
#[test]
fn test_truncate_uncommitted_records_on_open() {
// Simulates crash recovery where sidecar has more records than committed header length.
// ChangesetOffsetWriter::new() should automatically truncate to committed_len.
let dir = tempdir().unwrap();
let path = dir.path().join("test.csoff");
// Simulate: wrote 3 records, synced sidecar, but header only committed len=2
{
let mut writer = ChangesetOffsetWriter::new(&path, 0).unwrap();
writer.append(&ChangesetOffset::new(0, 5)).unwrap();
writer.append(&ChangesetOffset::new(5, 10)).unwrap();
writer.append(&ChangesetOffset::new(15, 7)).unwrap(); // uncommitted
writer.sync().unwrap();
assert_eq!(writer.len(), 3);
}
// On "restart", new() heals by truncating to committed length
let committed_len = 2u64;
{
let writer = ChangesetOffsetWriter::new(&path, committed_len).unwrap();
assert_eq!(writer.len(), 2); // Healed to committed length
}
// Verify file is now correct length and new appends go to the right place
{
let mut writer = ChangesetOffsetWriter::new(&path, 2).unwrap();
assert_eq!(writer.len(), 2);
// Append a new record - should be at index 2, not index 3
writer.append(&ChangesetOffset::new(15, 20)).unwrap();
writer.sync().unwrap();
assert_eq!(writer.len(), 3);
}
// Verify the records are correct
{
let mut reader = ChangesetOffsetReader::new(&path).unwrap();
assert_eq!(reader.len(), 3);
let entry0 = reader.get(0).unwrap().unwrap();
assert_eq!(entry0.offset(), 0);
assert_eq!(entry0.num_changes(), 5);
let entry1 = reader.get(1).unwrap().unwrap();
assert_eq!(entry1.offset(), 5);
assert_eq!(entry1.num_changes(), 10);
// This should be the NEW record, not the old uncommitted one
let entry2 = reader.get(2).unwrap().unwrap();
assert_eq!(entry2.offset(), 15);
assert_eq!(entry2.num_changes(), 20); // Not 7 from the old uncommitted record
}
}
#[test]
fn test_sidecar_shorter_than_committed_errors() {
// If sidecar has fewer records than committed, it's data corruption - should error.
let dir = tempdir().unwrap();
let path = dir.path().join("test.csoff");
// Write 1 record
{
let mut writer = ChangesetOffsetWriter::new(&path, 0).unwrap();
writer.append(&ChangesetOffset::new(0, 5)).unwrap();
writer.sync().unwrap();
}
// Try to open with committed_len=3 (header claims more than file has)
let result = ChangesetOffsetWriter::new(&path, 3);
assert!(result.is_err());
let err = result.unwrap_err();
assert_eq!(err.kind(), io::ErrorKind::InvalidData);
}
#[test]
fn test_reader_with_len_shorter_than_file_errors() {
// If header claims more records than file has, with_len should error (data corruption).
let dir = tempdir().unwrap();
let path = dir.path().join("test.csoff");
// Write 1 record
{
let mut writer = ChangesetOffsetWriter::new(&path, 0).unwrap();
writer.append(&ChangesetOffset::new(0, 5)).unwrap();
writer.sync().unwrap();
}
// Try to open reader with len=3 (header claims more than file has)
let result = ChangesetOffsetReader::with_len(&path, 3);
assert!(result.is_err());
let err = result.unwrap_err();
assert_eq!(err.kind(), io::ErrorKind::InvalidData);
}
}

View File

@@ -15,11 +15,18 @@ mod compression;
mod event;
mod segment;
#[cfg(feature = "std")]
mod changeset_offsets;
#[cfg(feature = "std")]
pub use changeset_offsets::{ChangesetOffsetReader, ChangesetOffsetWriter};
use alloy_primitives::BlockNumber;
pub use compression::Compression;
use core::ops::RangeInclusive;
pub use event::StaticFileProducerEvent;
pub use segment::{SegmentConfig, SegmentHeader, SegmentRangeInclusive, StaticFileSegment};
pub use segment::{
ChangesetOffset, SegmentConfig, SegmentHeader, SegmentRangeInclusive, StaticFileSegment,
};
/// Map keyed by [`StaticFileSegment`].
pub type StaticFileMap<T> = alloc::boxed::Box<fixed_map::Map<StaticFileSegment, T>>;

View File

@@ -1,5 +1,5 @@
use crate::{find_fixed_range, BlockNumber, Compression};
use alloc::{format, string::String, vec::Vec};
use alloc::{format, string::String};
use alloy_primitives::TxNumber;
use core::{
ops::{Range, RangeInclusive},
@@ -211,6 +211,11 @@ pub struct ChangesetOffset {
}
impl ChangesetOffset {
/// Creates a new changeset offset.
pub const fn new(offset: u64, num_changes: u64) -> Self {
Self { offset, num_changes }
}
/// Returns the start offset for the row for this block
pub const fn offset(&self) -> u64 {
self.offset
@@ -225,6 +230,11 @@ impl ChangesetOffset {
pub const fn changeset_range(&self) -> Range<u64> {
self.offset..(self.offset + self.num_changes)
}
/// Increments the number of changes by 1.
pub const fn increment_num_changes(&mut self) {
self.num_changes += 1;
}
}
/// A segment header that contains information common to all segments. Used for storage.
@@ -242,7 +252,7 @@ pub struct SegmentHeader {
/// Segment type
segment: StaticFileSegment,
/// List of offsets, for where each block's changeset starts.
changeset_offsets: Option<Vec<ChangesetOffset>>,
changeset_offsets_len: u64,
}
struct SegmentHeaderVisitor;
@@ -271,21 +281,18 @@ impl<'de> Visitor<'de> for SegmentHeaderVisitor {
let segment: StaticFileSegment =
seq.next_element()?.ok_or_else(|| serde::de::Error::invalid_length(3, &self))?;
let changeset_offsets = if segment.is_change_based() {
// Try to read the 5th field (changeset_offsets)
// If it doesn't exist (old format), this will return None
match seq.next_element()? {
Some(Some(offsets)) => Some(offsets),
// Changesets should have offsets
Some(None) => None,
let changeset_offsets_len = if segment.is_change_based() {
// Try to read the 5th field (changeset_offsets_len)
match seq.next_element::<u64>()? {
Some(len) => len,
None => {
return Err(serde::de::Error::custom(
"changeset_offsets should exist for static files",
"changeset_offsets_len should exist for changeset static files",
))
}
}
} else {
None
0
};
Ok(SegmentHeader {
@@ -293,7 +300,7 @@ impl<'de> Visitor<'de> for SegmentHeaderVisitor {
block_range,
tx_range,
segment,
changeset_offsets,
changeset_offsets_len,
})
}
}
@@ -328,7 +335,7 @@ impl Serialize for SegmentHeader {
state.serialize_field("segment", &self.segment)?;
if self.segment.is_change_based() {
state.serialize_field("changeset_offsets", &self.changeset_offsets)?;
state.serialize_field("changeset_offsets", &self.changeset_offsets_len)?;
}
state.end()
@@ -343,7 +350,7 @@ impl SegmentHeader {
tx_range: Option<SegmentRangeInclusive>,
segment: StaticFileSegment,
) -> Self {
Self { expected_block_range, block_range, tx_range, segment, changeset_offsets: None }
Self { expected_block_range, block_range, tx_range, segment, changeset_offsets_len: 0 }
}
/// Returns the static file segment kind.
@@ -366,9 +373,20 @@ impl SegmentHeader {
self.tx_range
}
/// Returns the changeset offsets.
pub const fn changeset_offsets(&self) -> Option<&Vec<ChangesetOffset>> {
self.changeset_offsets.as_ref()
/// Returns the number of changeset offset entries.
/// The actual offsets are stored in the .csoff sidecar file.
pub const fn changeset_offsets_len(&self) -> u64 {
self.changeset_offsets_len
}
/// Sets the changeset offsets length.
pub const fn set_changeset_offsets_len(&mut self, len: u64) {
self.changeset_offsets_len = len;
}
/// Increments the changeset offsets length by 1.
pub const fn increment_changeset_offsets_len(&mut self) {
self.changeset_offsets_len += 1;
}
/// The expected block start of the segment.
@@ -426,7 +444,7 @@ impl SegmentHeader {
}
/// Increments block end range depending on segment
pub fn increment_block(&mut self) -> BlockNumber {
pub const fn increment_block(&mut self) -> BlockNumber {
let block_num = if let Some(block_range) = &mut self.block_range {
block_range.end += 1;
block_range.end
@@ -438,20 +456,10 @@ impl SegmentHeader {
self.expected_block_start()
};
// For changeset segments, initialize an offset entry for the new block
// For changeset segments, increment the offsets length.
// The actual offset entry is written to the sidecar file by the caller.
if self.segment.is_change_based() {
let offsets = self.changeset_offsets.get_or_insert_default();
// Calculate the offset for the new block
let new_offset = if let Some(last_offset) = offsets.last() {
// The new block starts after the last block's changes
last_offset.offset + last_offset.num_changes
} else {
// First block starts at offset 0
0
};
// Add a new offset entry with 0 changes initially
offsets.push(ChangesetOffset { offset: new_offset, num_changes: 0 });
self.changeset_offsets_len += 1;
}
block_num
@@ -468,23 +476,11 @@ impl SegmentHeader {
}
}
/// Increments the latest block's number of changes.
pub fn increment_block_changes(&mut self) {
debug_assert!(self.segment().is_change_based());
if self.segment.is_change_based() {
let offsets = self.changeset_offsets.get_or_insert_with(Default::default);
if let Some(last_offset) = offsets.last_mut() {
last_offset.num_changes += 1;
} else {
// If offsets is empty, we are adding the first change for a block
// The offset for the first block is 0
offsets.push(ChangesetOffset { offset: 0, num_changes: 1 });
}
}
}
/// Removes `num` elements from end of tx or block range.
pub fn prune(&mut self, num: u64) {
///
/// For changeset segments, also decrements the changeset offsets length.
/// The caller must truncate the sidecar file accordingly.
pub const fn prune(&mut self, num: u64) {
// Changesets also contain a block range, but are not strictly block-based
if self.segment.is_block_or_change_based() {
if let Some(range) = &mut self.block_range {
@@ -492,26 +488,18 @@ impl SegmentHeader {
self.block_range = None;
// Clear all changeset offsets if we're clearing all blocks
if self.segment.is_change_based() {
self.changeset_offsets = None;
self.changeset_offsets_len = 0;
}
} else {
let old_end = range.end;
range.end = range.end.saturating_sub(num);
// Update changeset offsets for account changesets
if self.segment.is_change_based() &&
let Some(offsets) = &mut self.changeset_offsets
{
// Update changeset offsets length for changeset segments
if self.segment.is_change_based() {
// Calculate how many blocks we're removing
let blocks_to_remove = old_end - range.end;
// Remove the last `blocks_to_remove` entries from offsets
let new_len = offsets.len().saturating_sub(blocks_to_remove as usize);
offsets.truncate(new_len);
// If we removed all offsets, set to None
if offsets.is_empty() {
self.changeset_offsets = None;
}
self.changeset_offsets_len =
self.changeset_offsets_len.saturating_sub(blocks_to_remove);
}
}
};
@@ -534,28 +522,24 @@ impl SegmentHeader {
}
}
/// Synchronizes changeset offsets with the current block range for account changeset segments.
/// Synchronizes changeset offsets length with the current block range for changeset segments.
///
/// This should be called after modifying the block range when dealing with changeset segments
/// to ensure the offsets vector matches the block range size.
pub fn sync_changeset_offsets(&mut self) {
/// to ensure the offsets length matches the block range size.
/// The caller must also truncate the sidecar file accordingly.
pub const fn sync_changeset_offsets(&mut self) {
if !self.segment.is_change_based() {
return;
}
if let Some(block_range) = &self.block_range {
if let Some(offsets) = &mut self.changeset_offsets {
let expected_len = (block_range.end - block_range.start + 1) as usize;
if offsets.len() > expected_len {
offsets.truncate(expected_len);
if offsets.is_empty() {
self.changeset_offsets = None;
}
}
let expected_len = block_range.end - block_range.start + 1;
if self.changeset_offsets_len > expected_len {
self.changeset_offsets_len = expected_len;
}
} else {
// No block range means no offsets
self.changeset_offsets = None;
self.changeset_offsets_len = 0;
}
}
@@ -581,21 +565,22 @@ impl SegmentHeader {
self.tx_start()
}
/// Returns the `ChangesetOffset` corresponding for the given block, if it's in the block
/// range.
/// Returns the index into the sidecar file for a given block's changeset offset.
///
/// If it is not in the block range or the changeset list in the header does not contain a
/// value for the block, this returns `None`.
pub fn changeset_offset(&self, block: BlockNumber) -> Option<&ChangesetOffset> {
/// Returns `None` if the block is not in the block range.
/// The caller must read the actual offset from the sidecar file at this index.
pub fn changeset_offset_index(&self, block: BlockNumber) -> Option<u64> {
let block_range = self.block_range()?;
if !block_range.contains(block) {
return None
return None;
}
let offsets = self.changeset_offsets.as_ref()?;
let index = (block - block_range.start()) as usize;
let index = block - block_range.start();
if index >= self.changeset_offsets_len {
return None;
}
offsets.get(index)
Some(index)
}
}
@@ -754,42 +739,42 @@ mod tests {
block_range: Some(SegmentRangeInclusive::new(0, 100)),
tx_range: None,
segment: StaticFileSegment::Headers,
changeset_offsets: None,
changeset_offsets_len: 0,
},
SegmentHeader {
expected_block_range: SegmentRangeInclusive::new(0, 200),
block_range: None,
tx_range: Some(SegmentRangeInclusive::new(0, 300)),
segment: StaticFileSegment::Transactions,
changeset_offsets: None,
changeset_offsets_len: 0,
},
SegmentHeader {
expected_block_range: SegmentRangeInclusive::new(0, 200),
block_range: Some(SegmentRangeInclusive::new(0, 100)),
tx_range: Some(SegmentRangeInclusive::new(0, 300)),
segment: StaticFileSegment::Receipts,
changeset_offsets: None,
changeset_offsets_len: 0,
},
SegmentHeader {
expected_block_range: SegmentRangeInclusive::new(0, 200),
block_range: Some(SegmentRangeInclusive::new(0, 100)),
tx_range: Some(SegmentRangeInclusive::new(0, 300)),
segment: StaticFileSegment::TransactionSenders,
changeset_offsets: None,
changeset_offsets_len: 0,
},
SegmentHeader {
expected_block_range: SegmentRangeInclusive::new(0, 200),
block_range: Some(SegmentRangeInclusive::new(0, 100)),
tx_range: Some(SegmentRangeInclusive::new(0, 300)),
segment: StaticFileSegment::AccountChangeSets,
changeset_offsets: Some(vec![ChangesetOffset { offset: 1, num_changes: 1 }; 100]),
changeset_offsets_len: 100,
},
SegmentHeader {
expected_block_range: SegmentRangeInclusive::new(0, 200),
block_range: Some(SegmentRangeInclusive::new(0, 100)),
tx_range: None,
segment: StaticFileSegment::StorageChangeSets,
changeset_offsets: Some(vec![ChangesetOffset { offset: 1, num_changes: 1 }; 100]),
changeset_offsets_len: 100,
},
];
// Check that we test all segments

View File

@@ -2,4 +2,4 @@
source: crates/static-file/types/src/segment.rs
expression: "Bytes::from(serialized)"
---
0x01000000000000000000000000000000c80000000000000001000000000000000064000000000000000100000000000000002c01000000000000040000000164000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000000000000000000000000000000000000
0x01000000000000000000000000000000c80000000000000001000000000000000064000000000000000100000000000000002c0100000000000004000000640000000000000001000000000000000000000000000000000000000000000000

View File

@@ -2,4 +2,4 @@
source: crates/static-file/types/src/segment.rs
expression: "Bytes::from(serialized)"
---
0x01000000000000000000000000000000c800000000000000010000000000000000640000000000000000050000000164000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000000000000000000000000000000000000
0x01000000000000000000000000000000c80000000000000001000000000000000064000000000000000005000000640000000000000001000000000000000000000000000000000000000000000000

View File

@@ -59,6 +59,8 @@ const INDEX_FILE_EXTENSION: &str = "idx";
const OFFSETS_FILE_EXTENSION: &str = "off";
/// The file extension used for configuration files.
pub const CONFIG_FILE_EXTENSION: &str = "conf";
/// The file extension used for changeset offset sidecar files.
pub const CHANGESET_OFFSETS_FILE_EXTENSION: &str = "csoff";
/// A [`RefRow`] is a list of column value slices pointing to either an internal buffer or a
/// memory-mapped file.
@@ -240,13 +242,22 @@ impl<H: NippyJarHeader> NippyJar<H> {
self.path.with_extension(CONFIG_FILE_EXTENSION)
}
/// Returns the path for the changeset offsets sidecar file.
pub fn changeset_offsets_path(&self) -> PathBuf {
self.path.with_extension(CHANGESET_OFFSETS_FILE_EXTENSION)
}
/// Deletes from disk this [`NippyJar`] alongside every satellite file.
pub fn delete(self) -> Result<(), NippyJarError> {
// TODO(joshie): ensure consistency on unexpected shutdown
for path in
[self.data_path().into(), self.index_path(), self.offsets_path(), self.config_path()]
{
for path in [
self.data_path().into(),
self.index_path(),
self.offsets_path(),
self.config_path(),
self.changeset_offsets_path(),
] {
if path.exists() {
debug!(target: "nippy-jar", ?path, "Removing file.");
reth_fs_util::remove_file(path)?;

View File

@@ -17,6 +17,7 @@ use reth_db::static_file::{
use reth_db_api::table::{Decompress, Value};
use reth_node_types::NodePrimitives;
use reth_primitives_traits::{SealedHeader, SignedTransaction};
use reth_static_file_types::{ChangesetOffset, ChangesetOffsetReader};
use reth_storage_api::range_size_hint;
use reth_storage_errors::provider::{ProviderError, ProviderResult};
use std::{
@@ -90,6 +91,61 @@ impl<'a, N: NodePrimitives> StaticFileJarProvider<'a, N> {
pub fn size(&self) -> usize {
self.jar.value().size()
}
/// Reads a changeset offset from the sidecar file for a given block.
///
/// Returns `None` if:
/// - The segment is not change-based
/// - The block is not in the block range
/// - The sidecar file doesn't exist
pub fn read_changeset_offset(
&self,
block_number: BlockNumber,
) -> ProviderResult<Option<ChangesetOffset>> {
let header = self.user_header();
if !header.segment().is_change_based() {
return Ok(None);
}
let Some(index) = header.changeset_offset_index(block_number) else {
return Ok(None);
};
let csoff_path = self.data_path().with_extension("csoff");
if !csoff_path.exists() {
return Ok(None);
}
let mut reader = ChangesetOffsetReader::new(&csoff_path).map_err(ProviderError::other)?;
reader.get(index).map_err(ProviderError::other)
}
/// Reads all changeset offsets from the sidecar file.
///
/// Returns `None` if:
/// - The segment is not change-based
/// - The sidecar file doesn't exist
pub fn read_changeset_offsets(&self) -> ProviderResult<Option<Vec<ChangesetOffset>>> {
let header = self.user_header();
if !header.segment().is_change_based() {
return Ok(None);
}
let len = header.changeset_offsets_len();
if len == 0 {
return Ok(Some(Vec::new()));
}
let csoff_path = self.data_path().with_extension("csoff");
if !csoff_path.exists() {
return Ok(None);
}
let mut reader =
ChangesetOffsetReader::with_len(&csoff_path, len).map_err(ProviderError::other)?;
let offsets = reader.get_range(0, len).map_err(ProviderError::other)?;
Ok(Some(offsets))
}
}
impl<N: NodePrimitives<BlockHeader: Value>> HeaderProvider for StaticFileJarProvider<'_, N> {

View File

@@ -39,8 +39,8 @@ use reth_primitives_traits::{
};
use reth_stages_types::{PipelineTarget, StageId};
use reth_static_file_types::{
find_fixed_range, HighestStaticFiles, SegmentHeader, SegmentRangeInclusive, StaticFileMap,
StaticFileSegment, DEFAULT_BLOCKS_PER_STATIC_FILE,
find_fixed_range, ChangesetOffsetReader, HighestStaticFiles, SegmentHeader,
SegmentRangeInclusive, StaticFileMap, StaticFileSegment, DEFAULT_BLOCKS_PER_STATIC_FILE,
};
use reth_storage_api::{
BlockBodyIndicesProvider, ChangeSetReader, DBProvider, StorageChangeSetReader,
@@ -961,10 +961,10 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
) -> ProviderResult<SegmentHeader> {
let fixed_block_range = self.find_fixed_range(segment, block);
let key = (fixed_block_range.end(), segment);
let file = self.path.join(segment.filename(&fixed_block_range));
let jar = if let Some((_, jar)) = self.map.remove(&key) {
jar.jar
} else {
let file = self.path.join(segment.filename(&fixed_block_range));
debug!(
target: "provider::static_file",
?file,
@@ -976,6 +976,15 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
};
let header = jar.user_header().clone();
// Delete the sidecar file for changeset segments before deleting the main jar
if segment.is_change_based() {
let csoff_path = file.with_extension("csoff");
if csoff_path.exists() {
std::fs::remove_file(&csoff_path).map_err(ProviderError::other)?;
}
}
jar.delete().map_err(ProviderError::other)?;
// SAFETY: this is currently necessary to ensure that certain indexes like
@@ -2295,7 +2304,7 @@ impl<N: NodePrimitives> ChangeSetReader for StaticFileProvider<N> {
Err(err) => return Err(err),
};
if let Some(offset) = provider.user_header().changeset_offset(block_number) {
if let Some(offset) = provider.read_changeset_offset(block_number)? {
let mut cursor = provider.cursor()?;
let mut changeset = Vec::with_capacity(offset.num_changes() as usize);
@@ -2327,9 +2336,7 @@ impl<N: NodePrimitives> ChangeSetReader for StaticFileProvider<N> {
Err(err) => return Err(err),
};
let user_header = provider.user_header();
let Some(offset) = user_header.changeset_offset(block_number) else {
let Some(offset) = provider.read_changeset_offset(block_number)? else {
return Ok(None);
};
@@ -2388,12 +2395,19 @@ impl<N: NodePrimitives> ChangeSetReader for StaticFileProvider<N> {
fn account_changeset_count(&self) -> ProviderResult<usize> {
let mut count = 0;
// iterate through static files and sum changeset metadata via each static file header
let static_files = iter_static_files(&self.path).map_err(ProviderError::other)?;
if let Some(changeset_segments) = static_files.get(StaticFileSegment::AccountChangeSets) {
for (_, header) in changeset_segments {
if let Some(changeset_offsets) = header.changeset_offsets() {
for offset in changeset_offsets {
for (block_range, header) in changeset_segments {
let csoff_path = self
.path
.join(StaticFileSegment::AccountChangeSets.filename(block_range))
.with_extension("csoff");
if csoff_path.exists() {
let len = header.changeset_offsets_len();
let mut reader = ChangesetOffsetReader::with_len(&csoff_path, len)
.map_err(ProviderError::other)?;
let offsets = reader.get_range(0, len).map_err(ProviderError::other)?;
for offset in offsets {
count += offset.num_changes() as usize;
}
}
@@ -2419,7 +2433,7 @@ impl<N: NodePrimitives> StorageChangeSetReader for StaticFileProvider<N> {
Err(err) => return Err(err),
};
if let Some(offset) = provider.user_header().changeset_offset(block_number) {
if let Some(offset) = provider.read_changeset_offset(block_number)? {
let mut cursor = provider.cursor()?;
let mut changeset = Vec::with_capacity(offset.num_changes() as usize);
@@ -2452,8 +2466,7 @@ impl<N: NodePrimitives> StorageChangeSetReader for StaticFileProvider<N> {
Err(err) => return Err(err),
};
let user_header = provider.user_header();
let Some(offset) = user_header.changeset_offset(block_number) else {
let Some(offset) = provider.read_changeset_offset(block_number)? else {
return Ok(None);
};
@@ -2510,9 +2523,17 @@ impl<N: NodePrimitives> StorageChangeSetReader for StaticFileProvider<N> {
let static_files = iter_static_files(&self.path).map_err(ProviderError::other)?;
if let Some(changeset_segments) = static_files.get(StaticFileSegment::StorageChangeSets) {
for (_, header) in changeset_segments {
if let Some(changeset_offsets) = header.changeset_offsets() {
for offset in changeset_offsets {
for (block_range, header) in changeset_segments {
let csoff_path = self
.path
.join(StaticFileSegment::StorageChangeSets.filename(block_range))
.with_extension("csoff");
if csoff_path.exists() {
let len = header.changeset_offsets_len();
let mut reader = ChangesetOffsetReader::with_len(&csoff_path, len)
.map_err(ProviderError::other)?;
let offsets = reader.get_range(0, len).map_err(ProviderError::other)?;
for offset in offsets {
count += offset.num_changes() as usize;
}
}

View File

@@ -743,8 +743,9 @@ mod tests {
.unwrap();
// Check that the segment header has changeset offsets
assert!(provider.user_header().changeset_offsets().is_some());
let offsets = provider.user_header().changeset_offsets().unwrap();
let offsets = provider.read_changeset_offsets().unwrap();
assert!(offsets.is_some());
let offsets = offsets.unwrap();
assert_eq!(offsets.len(), 10); // Should have 10 blocks worth of offsets
// Verify each block has the expected number of changes
@@ -854,7 +855,8 @@ mod tests {
let (static_dir, _) = create_test_static_files_dir();
let blocks_per_file = 10;
let files_per_range = 3;
// 3 main files (jar, dat, idx) + 1 csoff sidecar file for changeset segments
let files_per_range = 4;
let file_set_count = 3;
let initial_file_count = files_per_range * file_set_count;
let tip = blocks_per_file * file_set_count - 1;
@@ -923,7 +925,7 @@ mod tests {
)?;
// Check offsets are valid
let offsets = provider.user_header().changeset_offsets();
let offsets = provider.read_changeset_offsets().unwrap();
assert!(offsets.is_some(), "Should have changeset offsets");
}
@@ -1087,8 +1089,9 @@ mod tests {
.unwrap();
// Check that the segment header has changeset offsets
assert!(provider.user_header().changeset_offsets().is_some());
let offsets = provider.user_header().changeset_offsets().unwrap();
let offsets = provider.read_changeset_offsets().unwrap();
assert!(offsets.is_some());
let offsets = offsets.unwrap();
assert_eq!(offsets.len(), 10); // Should have 10 blocks worth of offsets
// Verify each block has the expected number of changes
@@ -1189,7 +1192,8 @@ mod tests {
let (static_dir, _) = create_test_static_files_dir();
let blocks_per_file = 10;
let files_per_range = 3;
// 3 main files (jar, dat, idx) + 1 csoff sidecar file for changeset segments
let files_per_range = 4;
let file_set_count = 3;
let initial_file_count = files_per_range * file_set_count;
let tip = blocks_per_file * file_set_count - 1;
@@ -1248,7 +1252,7 @@ mod tests {
tip,
None,
)?;
let offsets = provider.user_header().changeset_offsets();
let offsets = provider.read_changeset_offsets()?;
assert!(offsets.is_some(), "Should have changeset offsets");
}
@@ -1351,4 +1355,42 @@ mod tests {
}
}
}
#[test]
fn test_last_block_flushed_on_commit() {
let (static_dir, _) = create_test_static_files_dir();
let sf_rw = StaticFileProvider::<EthPrimitives>::read_write(&static_dir)
.expect("Failed to create static file provider");
let address = Address::from([5u8; 20]);
let key = B256::with_last_byte(1);
// Write changes for a single block without calling increment_block explicitly
// (append_storage_changeset calls it internally), then commit
{
let mut writer = sf_rw.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
// Append a single block's changeset (block 0)
writer
.append_storage_changeset(
vec![StorageBeforeTx { address, key, value: U256::from(42) }],
0,
)
.unwrap();
// Commit without any subsequent block - the current block's offset should be flushed
writer.commit().unwrap();
}
// Verify highest block is 0
let highest = sf_rw.get_highest_static_file_block(StaticFileSegment::StorageChangeSets);
assert_eq!(highest, Some(0), "Should have block 0 after commit");
// Verify the data is actually readable via the high-level API
let result = sf_rw.get_storage_before_block(0, address, key).unwrap();
assert!(result.is_some(), "Should be able to read the changeset entry");
let entry = result.unwrap();
assert_eq!(entry.value, U256::from(42));
}
}

View File

@@ -10,7 +10,10 @@ use reth_db::models::{AccountBeforeTx, StorageBeforeTx};
use reth_db_api::models::CompactU256;
use reth_nippy_jar::{NippyJar, NippyJarError, NippyJarWriter};
use reth_node_types::NodePrimitives;
use reth_static_file_types::{SegmentHeader, SegmentRangeInclusive, StaticFileSegment};
use reth_static_file_types::{
ChangesetOffset, ChangesetOffsetReader, ChangesetOffsetWriter, SegmentHeader,
SegmentRangeInclusive, StaticFileSegment,
};
use reth_storage_errors::provider::{ProviderError, ProviderResult, StaticFileWriterError};
use std::{
borrow::Borrow,
@@ -219,6 +222,10 @@ pub struct StaticFileProviderRW<N> {
prune_on_commit: Option<PruneStrategy>,
/// Whether `sync_all()` has been called. Used by `finalize()` to avoid redundant syncs.
synced: bool,
/// Changeset offsets sidecar writer (only for changeset segments).
changeset_offsets: Option<ChangesetOffsetWriter>,
/// Current block's changeset offset being written.
current_changeset_offset: Option<ChangesetOffset>,
}
impl<N: NodePrimitives> StaticFileProviderRW<N> {
@@ -233,6 +240,8 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
metrics: Option<Arc<StaticFileProviderMetrics>>,
) -> ProviderResult<Self> {
let (writer, data_path) = Self::open(segment, block, reader.clone(), metrics.clone())?;
// Create writer WITHOUT sidecar first - we'll add it after healing
let mut writer = Self {
writer,
data_path,
@@ -241,10 +250,19 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
metrics,
prune_on_commit: None,
synced: false,
changeset_offsets: None,
current_changeset_offset: None,
};
// Run NippyJar healing BEFORE setting up changeset sidecar
// This may reduce rows, which affects valid sidecar offsets
writer.ensure_end_range_consistency()?;
// Now set up changeset sidecar with post-heal header values
if segment.is_change_based() {
writer.heal_changeset_sidecar()?;
}
Ok(writer)
}
@@ -336,7 +354,112 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
self.prune_on_commit.is_some()
}
/// Syncs all data (rows and offsets) to disk.
/// Heals the changeset offset sidecar after NippyJar healing.
///
/// This must be called AFTER `ensure_end_range_consistency()` which may reduce rows.
/// If offsets point past the current row count, we truncate the sidecar to match.
fn heal_changeset_sidecar(&mut self) -> ProviderResult<()> {
let csoff_path = self.data_path.with_extension("csoff");
let committed_len = self.writer.user_header().changeset_offsets_len();
let actual_rows = self.writer.rows() as u64;
// Open the sidecar writer (this handles partial records and uncommitted tail)
let mut csoff_writer =
ChangesetOffsetWriter::new(&csoff_path, committed_len).map_err(ProviderError::other)?;
// If we have offsets, validate they don't exceed actual rows
if committed_len > 0 {
// Edge case: if actual_rows is 0 but we have offsets, all offsets are invalid
if actual_rows == 0 {
tracing::warn!(
target: "reth::static_file",
path = %csoff_path.display(),
committed_len,
"Truncating all changeset offsets - data file is empty"
);
csoff_writer.truncate(0).map_err(ProviderError::other)?;
self.writer.user_header_mut().set_changeset_offsets_len(0);
self.writer.user_header_mut().prune(committed_len);
self.writer.commit().map_err(ProviderError::other)?;
self.changeset_offsets = Some(csoff_writer);
return Ok(());
}
// Use with_len to respect committed length (writer already healed file to this length)
let mut reader =
ChangesetOffsetReader::with_len(&csoff_path, committed_len)
.map_err(ProviderError::other)?;
if let Some(last_offset) =
reader.get(committed_len - 1).map_err(ProviderError::other)?
{
let end_row = last_offset.offset() + last_offset.num_changes();
if end_row > actual_rows {
// Sidecar has offsets pointing past EOF - need to truncate blocks
// Find the last valid block (where offset + num_changes <= actual_rows)
let mut valid_blocks = 0u64;
for i in 0..committed_len {
if let Some(offset) = reader.get(i).map_err(ProviderError::other)? {
if offset.offset() + offset.num_changes() <= actual_rows {
valid_blocks = i + 1;
} else {
break;
}
}
}
tracing::warn!(
target: "reth::static_file",
path = %csoff_path.display(),
committed_len,
actual_rows,
valid_blocks,
"Truncating changeset offsets that point past healed data"
);
// Truncate sidecar to valid blocks
csoff_writer.truncate(valid_blocks).map_err(ProviderError::other)?;
// Update header to match
self.writer.user_header_mut().set_changeset_offsets_len(valid_blocks);
// Also need to update block range if we removed blocks
if valid_blocks < committed_len {
let blocks_removed = committed_len - valid_blocks;
self.writer.user_header_mut().prune(blocks_removed);
}
// Commit the healed state
self.writer.commit().map_err(ProviderError::other)?;
}
}
}
self.changeset_offsets = Some(csoff_writer);
Ok(())
}
/// Flushes the current changeset offset (if any) to the `.csoff` sidecar file.
///
/// This is idempotent - safe to call multiple times. After flushing, the current offset
/// is cleared to prevent duplicate writes.
///
/// This must be called before committing or syncing to ensure the last block's offset
/// is persisted, since `increment_block()` only writes the *previous* block's offset.
fn flush_current_changeset_offset(&mut self) -> ProviderResult<()> {
if !self.writer.user_header().segment().is_change_based() {
return Ok(());
}
if let Some(offset) = self.current_changeset_offset.take() &&
let Some(writer) = &mut self.changeset_offsets
{
writer.append(&offset).map_err(ProviderError::other)?;
}
Ok(())
}
/// Syncs all data (rows, offsets, and changeset offsets sidecar) to disk.
///
/// This does NOT commit the configuration. Call [`Self::finalize`] after to write the
/// configuration and mark the writer as clean.
@@ -346,6 +469,15 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
if self.prune_on_commit.is_some() {
return Err(StaticFileWriterError::FinalizeWithPruneQueued.into());
}
// Write the final block's offset and sync the sidecar for changeset segments
self.flush_current_changeset_offset()?;
if let Some(writer) = &mut self.changeset_offsets {
writer.sync().map_err(ProviderError::other)?;
// Update the header with the actual number of offsets written
self.writer.user_header_mut().set_changeset_offsets_len(writer.len());
}
if self.writer.is_dirty() {
self.writer.sync_all().map_err(ProviderError::other)?;
}
@@ -364,7 +496,9 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
}
if self.writer.is_dirty() {
if !self.synced {
self.writer.sync_all().map_err(ProviderError::other)?;
// Must call self.sync_all() to flush changeset offsets and update
// the header's changeset_offsets_len, not just the inner writer
self.sync_all()?;
}
self.writer.finalize().map_err(ProviderError::other)?;
@@ -405,6 +539,15 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
}
}
// For changeset segments, flush and sync the sidecar file before committing the main file.
// This ensures crash consistency: the sidecar is durable before the header references it.
self.flush_current_changeset_offset()?;
if let Some(writer) = &mut self.changeset_offsets {
writer.sync().map_err(ProviderError::other)?;
// Update the header with the actual number of offsets written
self.writer.user_header_mut().set_changeset_offsets_len(writer.len());
}
if self.writer.is_dirty() {
debug!(
target: "provider::static_file",
@@ -548,7 +691,16 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
let (writer, data_path) =
Self::open(segment, last_block + 1, self.reader.clone(), self.metrics.clone())?;
self.writer = writer;
self.data_path = data_path;
self.data_path = data_path.clone();
// Update changeset offsets writer for the new file (starts empty)
if segment.is_change_based() {
let csoff_path = data_path.with_extension("csoff");
self.changeset_offsets = Some(
ChangesetOffsetWriter::new(&csoff_path, 0)
.map_err(ProviderError::other)?,
);
}
*self.writer.user_header_mut() = SegmentHeader::new(
self.reader().find_fixed_range(segment, last_block + 1),
@@ -560,6 +712,20 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
}
self.writer.user_header_mut().increment_block();
// Handle changeset offset tracking for changeset segments
if segment.is_change_based() {
// Write previous block's offset if we have one
if let Some(offset) = self.current_changeset_offset.take() &&
let Some(writer) = &mut self.changeset_offsets
{
writer.append(&offset).map_err(ProviderError::other)?;
}
// Start tracking new block's offset
let new_offset = self.writer.rows() as u64;
self.current_changeset_offset = Some(ChangesetOffset::new(new_offset, 0));
}
if let Some(metrics) = &self.metrics {
metrics.record_segment_operation(
segment,
@@ -631,13 +797,6 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
expected_block_start = self.writer.user_header().expected_block_start();
}
// Now we're in the correct file, we need to find how many rows to prune
// We need to iterate through the changesets to find the correct position
// Since changesets are stored per block, we need to find the offset for the block
let changeset_offsets = self.writer.user_header().changeset_offsets().ok_or_else(|| {
ProviderError::other(StaticFileWriterError::new("Missing changeset offsets"))
})?;
// Find the number of rows to keep (up to and including last_block)
let blocks_to_keep = if last_block >= expected_block_start {
last_block - expected_block_start + 1
@@ -645,18 +804,27 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
0
};
// Read changeset offsets from sidecar file to find where to truncate
let csoff_path = self.data_path.with_extension("csoff");
let changeset_offsets_len = self.writer.user_header().changeset_offsets_len();
let rows_to_keep = if blocks_to_keep == 0 {
0
} else if blocks_to_keep as usize > changeset_offsets.len() {
// Keep all rows in this file (shouldn't happen if data is consistent)
self.writer.rows() as u64
} else if blocks_to_keep as usize == changeset_offsets.len() {
// Keep all rows
} else if blocks_to_keep >= changeset_offsets_len {
// Keep all rows in this file
self.writer.rows() as u64
} else {
// Find the offset for the block after last_block
// This gives us the number of rows to keep
changeset_offsets[blocks_to_keep as usize].offset()
// Read offset for the block after last_block from sidecar
// Use with_len to respect the committed length from header, not file length
let mut reader =
ChangesetOffsetReader::with_len(&csoff_path, changeset_offsets_len)
.map_err(ProviderError::other)?;
if let Some(next_offset) = reader.get(blocks_to_keep).map_err(ProviderError::other)? {
next_offset.offset()
} else {
// If we can't read the offset, keep all rows
self.writer.rows() as u64
}
};
let total_rows = self.writer.rows() as u64;
@@ -684,6 +852,14 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
// Sync changeset offsets to match the new block range
self.writer.user_header_mut().sync_changeset_offsets();
// Truncate the sidecar file to match the new block count
if let Some(writer) = &mut self.changeset_offsets {
writer.truncate(blocks_to_keep).map_err(ProviderError::other)?;
}
// Clear current changeset offset tracking since we've pruned
self.current_changeset_offset = None;
// Commits new changes to disk
self.commit()?;
@@ -764,16 +940,36 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
/// Delete the current static file, and replace this provider writer with the previous static
/// file.
fn delete_current_and_open_previous(&mut self) -> Result<(), ProviderError> {
let segment = self.user_header().segment();
let current_path = self.data_path.clone();
let (previous_writer, data_path) = Self::open(
self.user_header().segment(),
segment,
self.writer.user_header().expected_block_start() - 1,
self.reader.clone(),
self.metrics.clone(),
)?;
self.writer = previous_writer;
self.writer.set_dirty();
self.data_path = data_path;
self.data_path = data_path.clone();
// Delete the sidecar file for changeset segments before deleting the main jar
if segment.is_change_based() {
let csoff_path = current_path.with_extension("csoff");
if csoff_path.exists() {
std::fs::remove_file(&csoff_path).map_err(ProviderError::other)?;
}
// Re-initialize the changeset offsets writer for the previous file
let new_csoff_path = data_path.with_extension("csoff");
let committed_len = self.writer.user_header().changeset_offsets_len();
self.changeset_offsets = Some(
ChangesetOffsetWriter::new(&new_csoff_path, committed_len)
.map_err(ProviderError::other)?,
);
}
// Clear current changeset offset tracking since we're switching files
self.current_changeset_offset = None;
NippyJar::<SegmentHeader>::load(&current_path)
.map_err(ProviderError::other)?
.delete()
@@ -817,10 +1013,9 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
/// Appends change to changeset static file.
fn append_change<V: Compact>(&mut self, change: &V) -> ProviderResult<()> {
if self.writer.user_header().changeset_offsets().is_some() {
self.writer.user_header_mut().increment_block_changes();
if let Some(ref mut offset) = self.current_changeset_offset {
offset.increment_num_changes();
}
self.append_column(change)?;
Ok(())
}