mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
18 Commits
epbs-devne
...
pr-21596
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3c0b81dc71 | ||
|
|
04cf37ad4a | ||
|
|
9edc72e278 | ||
|
|
20a182abc3 | ||
|
|
1f951685cb | ||
|
|
33e695349e | ||
|
|
5e2b06ba0c | ||
|
|
f0a9ac0ff6 | ||
|
|
3ad2425bd2 | ||
|
|
c263eac79a | ||
|
|
8ffcde8d1a | ||
|
|
262149069a | ||
|
|
4a5d43dc46 | ||
|
|
17d3e80953 | ||
|
|
5282231574 | ||
|
|
ba9bc425d9 | ||
|
|
4e2fa3db44 | ||
|
|
34811b908c |
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -10900,6 +10900,8 @@ dependencies = [
|
||||
"serde",
|
||||
"serde_json",
|
||||
"strum 0.27.2",
|
||||
"tempfile",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
@@ -67,6 +67,7 @@ tempfile.workspace = true
|
||||
|
||||
[features]
|
||||
default = []
|
||||
edge = ["reth-provider/edge"]
|
||||
serde = [
|
||||
"reth-exex-types/serde",
|
||||
"reth-revm/serde",
|
||||
|
||||
@@ -38,3 +38,4 @@ assert_matches.workspace = true
|
||||
tempfile.workspace = true
|
||||
|
||||
[features]
|
||||
edge = ["reth-stages/edge"]
|
||||
|
||||
@@ -13,6 +13,7 @@ workspace = true
|
||||
|
||||
[dependencies]
|
||||
alloy-primitives.workspace = true
|
||||
tracing = { workspace = true, optional = true }
|
||||
|
||||
clap = { workspace = true, features = ["derive"], optional = true }
|
||||
fixed-map.workspace = true
|
||||
@@ -24,6 +25,7 @@ strum = { workspace = true, features = ["derive"] }
|
||||
reth-nippy-jar.workspace = true
|
||||
serde_json.workspace = true
|
||||
insta.workspace = true
|
||||
tempfile.workspace = true
|
||||
|
||||
[features]
|
||||
default = ["std"]
|
||||
@@ -34,5 +36,7 @@ std = [
|
||||
"strum/std",
|
||||
"serde_json/std",
|
||||
"fixed-map/std",
|
||||
"dep:tracing",
|
||||
"tracing?/std",
|
||||
]
|
||||
clap = ["dep:clap"]
|
||||
|
||||
471
crates/static-file/types/src/changeset_offsets.rs
Normal file
471
crates/static-file/types/src/changeset_offsets.rs
Normal file
@@ -0,0 +1,471 @@
|
||||
//! Changeset offset sidecar file I/O.
|
||||
//!
|
||||
//! Provides append-only writing and O(1) random-access reading for changeset offsets.
|
||||
//! The file format is fixed-width 16-byte records: `[offset: u64 LE][num_changes: u64 LE]`.
|
||||
|
||||
use crate::ChangesetOffset;
|
||||
use std::{
|
||||
fs::{File, OpenOptions},
|
||||
io::{self, Read, Seek, SeekFrom, Write},
|
||||
path::Path,
|
||||
};
|
||||
|
||||
/// Writer for appending changeset offsets to a sidecar file.
|
||||
#[derive(Debug)]
|
||||
pub struct ChangesetOffsetWriter {
|
||||
file: File,
|
||||
/// Number of records written.
|
||||
records_written: u64,
|
||||
}
|
||||
|
||||
impl ChangesetOffsetWriter {
|
||||
/// Record size in bytes.
|
||||
const RECORD_SIZE: usize = 16;
|
||||
|
||||
/// Opens or creates the changeset offset file for appending.
|
||||
///
|
||||
/// The file is healed to match `committed_len` (from the segment header):
|
||||
/// - Partial records (from crash mid-write) are truncated to record boundary
|
||||
/// - Extra complete records (from crash after sidecar sync but before header commit)
|
||||
/// are truncated to match the committed length
|
||||
/// - If the file has fewer records than committed, returns an error (data corruption)
|
||||
///
|
||||
/// This mirrors NippyJar's healing behavior where config/header is the commit boundary.
|
||||
pub fn new(path: impl AsRef<Path>, committed_len: u64) -> io::Result<Self> {
|
||||
let file = OpenOptions::new()
|
||||
.create(true)
|
||||
.truncate(false)
|
||||
.read(true)
|
||||
.write(true)
|
||||
.open(path.as_ref())?;
|
||||
|
||||
let file_len = file.metadata()?.len();
|
||||
let remainder = file_len % Self::RECORD_SIZE as u64;
|
||||
|
||||
// First, truncate any partial record from crash mid-write
|
||||
let aligned_len = if remainder != 0 {
|
||||
let truncated_len = file_len - remainder;
|
||||
#[cfg(feature = "std")]
|
||||
tracing::warn!(
|
||||
target: "reth::static_file",
|
||||
path = %path.as_ref().display(),
|
||||
original_len = file_len,
|
||||
truncated_len,
|
||||
"Truncating partial changeset offset record"
|
||||
);
|
||||
file.set_len(truncated_len)?;
|
||||
file.sync_all()?; // Sync required for crash safety
|
||||
truncated_len
|
||||
} else {
|
||||
file_len
|
||||
};
|
||||
|
||||
let records_in_file = aligned_len / Self::RECORD_SIZE as u64;
|
||||
|
||||
// Heal sidecar to match committed header length
|
||||
match records_in_file.cmp(&committed_len) {
|
||||
std::cmp::Ordering::Greater => {
|
||||
// Sidecar has uncommitted records from a crash - truncate them
|
||||
let target_len = committed_len * Self::RECORD_SIZE as u64;
|
||||
#[cfg(feature = "std")]
|
||||
tracing::warn!(
|
||||
target: "reth::static_file",
|
||||
path = %path.as_ref().display(),
|
||||
sidecar_records = records_in_file,
|
||||
committed_len,
|
||||
"Truncating uncommitted changeset offset records after crash recovery"
|
||||
);
|
||||
file.set_len(target_len)?;
|
||||
file.sync_all()?; // Sync required for crash safety
|
||||
}
|
||||
std::cmp::Ordering::Less => {
|
||||
// Unlikely: sidecar is shorter than header claims - data corruption or
|
||||
// incomplete prune
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::InvalidData,
|
||||
format!(
|
||||
"Changeset offset sidecar has {} records but header expects {}: {}",
|
||||
records_in_file,
|
||||
committed_len,
|
||||
path.as_ref().display()
|
||||
),
|
||||
));
|
||||
}
|
||||
std::cmp::Ordering::Equal => {}
|
||||
}
|
||||
|
||||
let records_written = committed_len;
|
||||
let file = OpenOptions::new().create(true).append(true).open(path)?;
|
||||
|
||||
Ok(Self { file, records_written })
|
||||
}
|
||||
|
||||
/// Appends a single changeset offset record.
|
||||
pub fn append(&mut self, offset: &ChangesetOffset) -> io::Result<()> {
|
||||
let mut buf = [0u8; Self::RECORD_SIZE];
|
||||
buf[..8].copy_from_slice(&offset.offset().to_le_bytes());
|
||||
buf[8..].copy_from_slice(&offset.num_changes().to_le_bytes());
|
||||
self.file.write_all(&buf)?;
|
||||
self.records_written += 1;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Appends multiple changeset offset records.
|
||||
pub fn append_many(&mut self, offsets: &[ChangesetOffset]) -> io::Result<()> {
|
||||
for offset in offsets {
|
||||
self.append(offset)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Syncs all data to disk. Must be called before committing the header.
|
||||
pub fn sync(&mut self) -> io::Result<()> {
|
||||
self.file.sync_all()
|
||||
}
|
||||
|
||||
/// Truncates the file to contain exactly `len` records and syncs to disk.
|
||||
/// Used after prune operations to reclaim space.
|
||||
///
|
||||
/// The sync is required for crash safety - without it, a crash could
|
||||
/// resurrect the old file length.
|
||||
pub fn truncate(&mut self, len: u64) -> io::Result<()> {
|
||||
self.file.set_len(len * Self::RECORD_SIZE as u64)?;
|
||||
self.file.sync_all()?;
|
||||
self.records_written = len;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns the number of records in the file.
|
||||
pub const fn len(&self) -> u64 {
|
||||
self.records_written
|
||||
}
|
||||
|
||||
/// Returns true if the file is empty.
|
||||
pub const fn is_empty(&self) -> bool {
|
||||
self.records_written == 0
|
||||
}
|
||||
}
|
||||
|
||||
/// Reader for changeset offsets with O(1) random access.
|
||||
#[derive(Debug)]
|
||||
pub struct ChangesetOffsetReader {
|
||||
file: File,
|
||||
/// Cached file length in records.
|
||||
len: u64,
|
||||
}
|
||||
|
||||
impl ChangesetOffsetReader {
|
||||
/// Record size in bytes.
|
||||
const RECORD_SIZE: usize = 16;
|
||||
|
||||
/// Opens the changeset offset file for reading.
|
||||
pub fn new(path: impl AsRef<Path>) -> io::Result<Self> {
|
||||
let file = File::open(path)?;
|
||||
let len = file.metadata()?.len() / Self::RECORD_SIZE as u64;
|
||||
Ok(Self { file, len })
|
||||
}
|
||||
|
||||
/// Opens with an explicit length (from header metadata).
|
||||
/// Any records beyond `len` are ignored.
|
||||
///
|
||||
/// Returns an error if the file has fewer records than `len` (data corruption).
|
||||
pub fn with_len(path: impl AsRef<Path>, len: u64) -> io::Result<Self> {
|
||||
let file = File::open(&path)?;
|
||||
let file_len = file.metadata()?.len();
|
||||
let records_in_file = file_len / Self::RECORD_SIZE as u64;
|
||||
|
||||
if records_in_file < len {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::InvalidData,
|
||||
format!(
|
||||
"Changeset offset sidecar has {} records but expected at least {}: {}",
|
||||
records_in_file,
|
||||
len,
|
||||
path.as_ref().display()
|
||||
),
|
||||
));
|
||||
}
|
||||
|
||||
Ok(Self { file, len })
|
||||
}
|
||||
|
||||
/// Reads a single changeset offset by block index.
|
||||
/// Returns None if index is out of bounds.
|
||||
pub fn get(&mut self, block_index: u64) -> io::Result<Option<ChangesetOffset>> {
|
||||
if block_index >= self.len {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let byte_pos = block_index * Self::RECORD_SIZE as u64;
|
||||
self.file.seek(SeekFrom::Start(byte_pos))?;
|
||||
|
||||
let mut buf = [0u8; Self::RECORD_SIZE];
|
||||
self.file.read_exact(&mut buf)?;
|
||||
|
||||
let offset = u64::from_le_bytes(buf[..8].try_into().unwrap());
|
||||
let num_changes = u64::from_le_bytes(buf[8..].try_into().unwrap());
|
||||
|
||||
Ok(Some(ChangesetOffset::new(offset, num_changes)))
|
||||
}
|
||||
|
||||
/// Reads a range of changeset offsets.
|
||||
pub fn get_range(&mut self, start: u64, end: u64) -> io::Result<Vec<ChangesetOffset>> {
|
||||
let end = end.min(self.len);
|
||||
if start >= end {
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
|
||||
let count = (end - start) as usize;
|
||||
let byte_pos = start * Self::RECORD_SIZE as u64;
|
||||
self.file.seek(SeekFrom::Start(byte_pos))?;
|
||||
|
||||
let mut result = Vec::with_capacity(count);
|
||||
let mut buf = [0u8; Self::RECORD_SIZE];
|
||||
|
||||
for _ in 0..count {
|
||||
self.file.read_exact(&mut buf)?;
|
||||
let offset = u64::from_le_bytes(buf[..8].try_into().unwrap());
|
||||
let num_changes = u64::from_le_bytes(buf[8..].try_into().unwrap());
|
||||
result.push(ChangesetOffset::new(offset, num_changes));
|
||||
}
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Returns the number of valid records.
|
||||
pub const fn len(&self) -> u64 {
|
||||
self.len
|
||||
}
|
||||
|
||||
/// Returns true if there are no records.
|
||||
pub const fn is_empty(&self) -> bool {
|
||||
self.len == 0
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use tempfile::tempdir;
|
||||
|
||||
#[test]
|
||||
fn test_write_and_read() {
|
||||
let dir = tempdir().unwrap();
|
||||
let path = dir.path().join("test.csoff");
|
||||
|
||||
// Write (new file, committed_len=0)
|
||||
{
|
||||
let mut writer = ChangesetOffsetWriter::new(&path, 0).unwrap();
|
||||
writer.append(&ChangesetOffset::new(0, 5)).unwrap();
|
||||
writer.append(&ChangesetOffset::new(5, 3)).unwrap();
|
||||
writer.append(&ChangesetOffset::new(8, 10)).unwrap();
|
||||
writer.sync().unwrap();
|
||||
assert_eq!(writer.len(), 3);
|
||||
}
|
||||
|
||||
// Read
|
||||
{
|
||||
let mut reader = ChangesetOffsetReader::new(&path).unwrap();
|
||||
assert_eq!(reader.len(), 3);
|
||||
|
||||
let entry = reader.get(0).unwrap().unwrap();
|
||||
assert_eq!(entry.offset(), 0);
|
||||
assert_eq!(entry.num_changes(), 5);
|
||||
|
||||
let entry = reader.get(1).unwrap().unwrap();
|
||||
assert_eq!(entry.offset(), 5);
|
||||
assert_eq!(entry.num_changes(), 3);
|
||||
|
||||
let entry = reader.get(2).unwrap().unwrap();
|
||||
assert_eq!(entry.offset(), 8);
|
||||
assert_eq!(entry.num_changes(), 10);
|
||||
|
||||
assert!(reader.get(3).unwrap().is_none());
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_truncate() {
|
||||
let dir = tempdir().unwrap();
|
||||
let path = dir.path().join("test.csoff");
|
||||
|
||||
let mut writer = ChangesetOffsetWriter::new(&path, 0).unwrap();
|
||||
writer.append(&ChangesetOffset::new(0, 1)).unwrap();
|
||||
writer.append(&ChangesetOffset::new(1, 2)).unwrap();
|
||||
writer.append(&ChangesetOffset::new(3, 3)).unwrap();
|
||||
writer.sync().unwrap();
|
||||
|
||||
writer.truncate(2).unwrap();
|
||||
assert_eq!(writer.len(), 2);
|
||||
|
||||
let mut reader = ChangesetOffsetReader::new(&path).unwrap();
|
||||
assert_eq!(reader.len(), 2);
|
||||
assert!(reader.get(2).unwrap().is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_partial_record_recovery() {
|
||||
let dir = tempdir().unwrap();
|
||||
let path = dir.path().join("test.csoff");
|
||||
|
||||
// Write 1 full record (16 bytes) + 8 trailing bytes (partial record)
|
||||
{
|
||||
let mut file = std::fs::File::create(&path).unwrap();
|
||||
// Full record: offset=100, num_changes=5
|
||||
file.write_all(&100u64.to_le_bytes()).unwrap();
|
||||
file.write_all(&5u64.to_le_bytes()).unwrap();
|
||||
// Partial record: only 8 bytes (incomplete)
|
||||
file.write_all(&200u64.to_le_bytes()).unwrap();
|
||||
file.sync_all().unwrap();
|
||||
}
|
||||
|
||||
// Verify file has 24 bytes before opening with writer
|
||||
assert_eq!(std::fs::metadata(&path).unwrap().len(), 24);
|
||||
|
||||
// Open with writer, committed_len=1 (header committed 1 record)
|
||||
// Should truncate partial record and match committed length
|
||||
let writer = ChangesetOffsetWriter::new(&path, 1).unwrap();
|
||||
assert_eq!(writer.len(), 1);
|
||||
|
||||
// Verify file was truncated to 16 bytes
|
||||
assert_eq!(std::fs::metadata(&path).unwrap().len(), 16);
|
||||
|
||||
// Verify the complete record is readable
|
||||
let mut reader = ChangesetOffsetReader::new(&path).unwrap();
|
||||
assert_eq!(reader.len(), 1);
|
||||
let entry = reader.get(0).unwrap().unwrap();
|
||||
assert_eq!(entry.offset(), 100);
|
||||
assert_eq!(entry.num_changes(), 5);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_with_len_bounds_reads() {
|
||||
let dir = tempdir().unwrap();
|
||||
let path = dir.path().join("test.csoff");
|
||||
|
||||
// Write 3 records
|
||||
{
|
||||
let mut writer = ChangesetOffsetWriter::new(&path, 0).unwrap();
|
||||
writer.append(&ChangesetOffset::new(0, 10)).unwrap();
|
||||
writer.append(&ChangesetOffset::new(10, 20)).unwrap();
|
||||
writer.append(&ChangesetOffset::new(30, 30)).unwrap();
|
||||
writer.sync().unwrap();
|
||||
assert_eq!(writer.len(), 3);
|
||||
}
|
||||
|
||||
// Open with explicit len=2, ignoring the 3rd record
|
||||
let mut reader = ChangesetOffsetReader::with_len(&path, 2).unwrap();
|
||||
assert_eq!(reader.len(), 2);
|
||||
|
||||
// First two records should be readable
|
||||
let entry0 = reader.get(0).unwrap().unwrap();
|
||||
assert_eq!(entry0.offset(), 0);
|
||||
assert_eq!(entry0.num_changes(), 10);
|
||||
|
||||
let entry1 = reader.get(1).unwrap().unwrap();
|
||||
assert_eq!(entry1.offset(), 10);
|
||||
assert_eq!(entry1.num_changes(), 20);
|
||||
|
||||
// Third record should be out of bounds (due to len=2)
|
||||
assert!(reader.get(2).unwrap().is_none());
|
||||
|
||||
// get_range should also respect the len bound
|
||||
let range = reader.get_range(0, 5).unwrap();
|
||||
assert_eq!(range.len(), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_truncate_uncommitted_records_on_open() {
|
||||
// Simulates crash recovery where sidecar has more records than committed header length.
|
||||
// ChangesetOffsetWriter::new() should automatically truncate to committed_len.
|
||||
let dir = tempdir().unwrap();
|
||||
let path = dir.path().join("test.csoff");
|
||||
|
||||
// Simulate: wrote 3 records, synced sidecar, but header only committed len=2
|
||||
{
|
||||
let mut writer = ChangesetOffsetWriter::new(&path, 0).unwrap();
|
||||
writer.append(&ChangesetOffset::new(0, 5)).unwrap();
|
||||
writer.append(&ChangesetOffset::new(5, 10)).unwrap();
|
||||
writer.append(&ChangesetOffset::new(15, 7)).unwrap(); // uncommitted
|
||||
writer.sync().unwrap();
|
||||
assert_eq!(writer.len(), 3);
|
||||
}
|
||||
|
||||
// On "restart", new() heals by truncating to committed length
|
||||
let committed_len = 2u64;
|
||||
{
|
||||
let writer = ChangesetOffsetWriter::new(&path, committed_len).unwrap();
|
||||
assert_eq!(writer.len(), 2); // Healed to committed length
|
||||
}
|
||||
|
||||
// Verify file is now correct length and new appends go to the right place
|
||||
{
|
||||
let mut writer = ChangesetOffsetWriter::new(&path, 2).unwrap();
|
||||
assert_eq!(writer.len(), 2);
|
||||
|
||||
// Append a new record - should be at index 2, not index 3
|
||||
writer.append(&ChangesetOffset::new(15, 20)).unwrap();
|
||||
writer.sync().unwrap();
|
||||
assert_eq!(writer.len(), 3);
|
||||
}
|
||||
|
||||
// Verify the records are correct
|
||||
{
|
||||
let mut reader = ChangesetOffsetReader::new(&path).unwrap();
|
||||
assert_eq!(reader.len(), 3);
|
||||
|
||||
let entry0 = reader.get(0).unwrap().unwrap();
|
||||
assert_eq!(entry0.offset(), 0);
|
||||
assert_eq!(entry0.num_changes(), 5);
|
||||
|
||||
let entry1 = reader.get(1).unwrap().unwrap();
|
||||
assert_eq!(entry1.offset(), 5);
|
||||
assert_eq!(entry1.num_changes(), 10);
|
||||
|
||||
// This should be the NEW record, not the old uncommitted one
|
||||
let entry2 = reader.get(2).unwrap().unwrap();
|
||||
assert_eq!(entry2.offset(), 15);
|
||||
assert_eq!(entry2.num_changes(), 20); // Not 7 from the old uncommitted record
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_sidecar_shorter_than_committed_errors() {
|
||||
// If sidecar has fewer records than committed, it's data corruption - should error.
|
||||
let dir = tempdir().unwrap();
|
||||
let path = dir.path().join("test.csoff");
|
||||
|
||||
// Write 1 record
|
||||
{
|
||||
let mut writer = ChangesetOffsetWriter::new(&path, 0).unwrap();
|
||||
writer.append(&ChangesetOffset::new(0, 5)).unwrap();
|
||||
writer.sync().unwrap();
|
||||
}
|
||||
|
||||
// Try to open with committed_len=3 (header claims more than file has)
|
||||
let result = ChangesetOffsetWriter::new(&path, 3);
|
||||
assert!(result.is_err());
|
||||
let err = result.unwrap_err();
|
||||
assert_eq!(err.kind(), io::ErrorKind::InvalidData);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_reader_with_len_shorter_than_file_errors() {
|
||||
// If header claims more records than file has, with_len should error (data corruption).
|
||||
let dir = tempdir().unwrap();
|
||||
let path = dir.path().join("test.csoff");
|
||||
|
||||
// Write 1 record
|
||||
{
|
||||
let mut writer = ChangesetOffsetWriter::new(&path, 0).unwrap();
|
||||
writer.append(&ChangesetOffset::new(0, 5)).unwrap();
|
||||
writer.sync().unwrap();
|
||||
}
|
||||
|
||||
// Try to open reader with len=3 (header claims more than file has)
|
||||
let result = ChangesetOffsetReader::with_len(&path, 3);
|
||||
assert!(result.is_err());
|
||||
let err = result.unwrap_err();
|
||||
assert_eq!(err.kind(), io::ErrorKind::InvalidData);
|
||||
}
|
||||
}
|
||||
@@ -15,11 +15,18 @@ mod compression;
|
||||
mod event;
|
||||
mod segment;
|
||||
|
||||
#[cfg(feature = "std")]
|
||||
mod changeset_offsets;
|
||||
#[cfg(feature = "std")]
|
||||
pub use changeset_offsets::{ChangesetOffsetReader, ChangesetOffsetWriter};
|
||||
|
||||
use alloy_primitives::BlockNumber;
|
||||
pub use compression::Compression;
|
||||
use core::ops::RangeInclusive;
|
||||
pub use event::StaticFileProducerEvent;
|
||||
pub use segment::{SegmentConfig, SegmentHeader, SegmentRangeInclusive, StaticFileSegment};
|
||||
pub use segment::{
|
||||
ChangesetOffset, SegmentConfig, SegmentHeader, SegmentRangeInclusive, StaticFileSegment,
|
||||
};
|
||||
|
||||
/// Map keyed by [`StaticFileSegment`].
|
||||
pub type StaticFileMap<T> = alloc::boxed::Box<fixed_map::Map<StaticFileSegment, T>>;
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use crate::{find_fixed_range, BlockNumber, Compression};
|
||||
use alloc::{format, string::String, vec::Vec};
|
||||
use alloc::{format, string::String};
|
||||
use alloy_primitives::TxNumber;
|
||||
use core::{
|
||||
ops::{Range, RangeInclusive},
|
||||
@@ -211,6 +211,11 @@ pub struct ChangesetOffset {
|
||||
}
|
||||
|
||||
impl ChangesetOffset {
|
||||
/// Creates a new changeset offset.
|
||||
pub const fn new(offset: u64, num_changes: u64) -> Self {
|
||||
Self { offset, num_changes }
|
||||
}
|
||||
|
||||
/// Returns the start offset for the row for this block
|
||||
pub const fn offset(&self) -> u64 {
|
||||
self.offset
|
||||
@@ -225,6 +230,11 @@ impl ChangesetOffset {
|
||||
pub const fn changeset_range(&self) -> Range<u64> {
|
||||
self.offset..(self.offset + self.num_changes)
|
||||
}
|
||||
|
||||
/// Increments the number of changes by 1.
|
||||
pub const fn increment_num_changes(&mut self) {
|
||||
self.num_changes += 1;
|
||||
}
|
||||
}
|
||||
|
||||
/// A segment header that contains information common to all segments. Used for storage.
|
||||
@@ -242,7 +252,7 @@ pub struct SegmentHeader {
|
||||
/// Segment type
|
||||
segment: StaticFileSegment,
|
||||
/// List of offsets, for where each block's changeset starts.
|
||||
changeset_offsets: Option<Vec<ChangesetOffset>>,
|
||||
changeset_offsets_len: u64,
|
||||
}
|
||||
|
||||
struct SegmentHeaderVisitor;
|
||||
@@ -271,21 +281,18 @@ impl<'de> Visitor<'de> for SegmentHeaderVisitor {
|
||||
let segment: StaticFileSegment =
|
||||
seq.next_element()?.ok_or_else(|| serde::de::Error::invalid_length(3, &self))?;
|
||||
|
||||
let changeset_offsets = if segment.is_change_based() {
|
||||
// Try to read the 5th field (changeset_offsets)
|
||||
// If it doesn't exist (old format), this will return None
|
||||
match seq.next_element()? {
|
||||
Some(Some(offsets)) => Some(offsets),
|
||||
// Changesets should have offsets
|
||||
Some(None) => None,
|
||||
let changeset_offsets_len = if segment.is_change_based() {
|
||||
// Try to read the 5th field (changeset_offsets_len)
|
||||
match seq.next_element::<u64>()? {
|
||||
Some(len) => len,
|
||||
None => {
|
||||
return Err(serde::de::Error::custom(
|
||||
"changeset_offsets should exist for static files",
|
||||
"changeset_offsets_len should exist for changeset static files",
|
||||
))
|
||||
}
|
||||
}
|
||||
} else {
|
||||
None
|
||||
0
|
||||
};
|
||||
|
||||
Ok(SegmentHeader {
|
||||
@@ -293,7 +300,7 @@ impl<'de> Visitor<'de> for SegmentHeaderVisitor {
|
||||
block_range,
|
||||
tx_range,
|
||||
segment,
|
||||
changeset_offsets,
|
||||
changeset_offsets_len,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -328,7 +335,7 @@ impl Serialize for SegmentHeader {
|
||||
state.serialize_field("segment", &self.segment)?;
|
||||
|
||||
if self.segment.is_change_based() {
|
||||
state.serialize_field("changeset_offsets", &self.changeset_offsets)?;
|
||||
state.serialize_field("changeset_offsets", &self.changeset_offsets_len)?;
|
||||
}
|
||||
|
||||
state.end()
|
||||
@@ -343,7 +350,7 @@ impl SegmentHeader {
|
||||
tx_range: Option<SegmentRangeInclusive>,
|
||||
segment: StaticFileSegment,
|
||||
) -> Self {
|
||||
Self { expected_block_range, block_range, tx_range, segment, changeset_offsets: None }
|
||||
Self { expected_block_range, block_range, tx_range, segment, changeset_offsets_len: 0 }
|
||||
}
|
||||
|
||||
/// Returns the static file segment kind.
|
||||
@@ -366,9 +373,20 @@ impl SegmentHeader {
|
||||
self.tx_range
|
||||
}
|
||||
|
||||
/// Returns the changeset offsets.
|
||||
pub const fn changeset_offsets(&self) -> Option<&Vec<ChangesetOffset>> {
|
||||
self.changeset_offsets.as_ref()
|
||||
/// Returns the number of changeset offset entries.
|
||||
/// The actual offsets are stored in the .csoff sidecar file.
|
||||
pub const fn changeset_offsets_len(&self) -> u64 {
|
||||
self.changeset_offsets_len
|
||||
}
|
||||
|
||||
/// Sets the changeset offsets length.
|
||||
pub const fn set_changeset_offsets_len(&mut self, len: u64) {
|
||||
self.changeset_offsets_len = len;
|
||||
}
|
||||
|
||||
/// Increments the changeset offsets length by 1.
|
||||
pub const fn increment_changeset_offsets_len(&mut self) {
|
||||
self.changeset_offsets_len += 1;
|
||||
}
|
||||
|
||||
/// The expected block start of the segment.
|
||||
@@ -426,7 +444,7 @@ impl SegmentHeader {
|
||||
}
|
||||
|
||||
/// Increments block end range depending on segment
|
||||
pub fn increment_block(&mut self) -> BlockNumber {
|
||||
pub const fn increment_block(&mut self) -> BlockNumber {
|
||||
let block_num = if let Some(block_range) = &mut self.block_range {
|
||||
block_range.end += 1;
|
||||
block_range.end
|
||||
@@ -438,20 +456,10 @@ impl SegmentHeader {
|
||||
self.expected_block_start()
|
||||
};
|
||||
|
||||
// For changeset segments, initialize an offset entry for the new block
|
||||
// For changeset segments, increment the offsets length.
|
||||
// The actual offset entry is written to the sidecar file by the caller.
|
||||
if self.segment.is_change_based() {
|
||||
let offsets = self.changeset_offsets.get_or_insert_default();
|
||||
// Calculate the offset for the new block
|
||||
let new_offset = if let Some(last_offset) = offsets.last() {
|
||||
// The new block starts after the last block's changes
|
||||
last_offset.offset + last_offset.num_changes
|
||||
} else {
|
||||
// First block starts at offset 0
|
||||
0
|
||||
};
|
||||
|
||||
// Add a new offset entry with 0 changes initially
|
||||
offsets.push(ChangesetOffset { offset: new_offset, num_changes: 0 });
|
||||
self.changeset_offsets_len += 1;
|
||||
}
|
||||
|
||||
block_num
|
||||
@@ -468,23 +476,11 @@ impl SegmentHeader {
|
||||
}
|
||||
}
|
||||
|
||||
/// Increments the latest block's number of changes.
|
||||
pub fn increment_block_changes(&mut self) {
|
||||
debug_assert!(self.segment().is_change_based());
|
||||
if self.segment.is_change_based() {
|
||||
let offsets = self.changeset_offsets.get_or_insert_with(Default::default);
|
||||
if let Some(last_offset) = offsets.last_mut() {
|
||||
last_offset.num_changes += 1;
|
||||
} else {
|
||||
// If offsets is empty, we are adding the first change for a block
|
||||
// The offset for the first block is 0
|
||||
offsets.push(ChangesetOffset { offset: 0, num_changes: 1 });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Removes `num` elements from end of tx or block range.
|
||||
pub fn prune(&mut self, num: u64) {
|
||||
///
|
||||
/// For changeset segments, also decrements the changeset offsets length.
|
||||
/// The caller must truncate the sidecar file accordingly.
|
||||
pub const fn prune(&mut self, num: u64) {
|
||||
// Changesets also contain a block range, but are not strictly block-based
|
||||
if self.segment.is_block_or_change_based() {
|
||||
if let Some(range) = &mut self.block_range {
|
||||
@@ -492,26 +488,18 @@ impl SegmentHeader {
|
||||
self.block_range = None;
|
||||
// Clear all changeset offsets if we're clearing all blocks
|
||||
if self.segment.is_change_based() {
|
||||
self.changeset_offsets = None;
|
||||
self.changeset_offsets_len = 0;
|
||||
}
|
||||
} else {
|
||||
let old_end = range.end;
|
||||
range.end = range.end.saturating_sub(num);
|
||||
|
||||
// Update changeset offsets for account changesets
|
||||
if self.segment.is_change_based() &&
|
||||
let Some(offsets) = &mut self.changeset_offsets
|
||||
{
|
||||
// Update changeset offsets length for changeset segments
|
||||
if self.segment.is_change_based() {
|
||||
// Calculate how many blocks we're removing
|
||||
let blocks_to_remove = old_end - range.end;
|
||||
// Remove the last `blocks_to_remove` entries from offsets
|
||||
let new_len = offsets.len().saturating_sub(blocks_to_remove as usize);
|
||||
offsets.truncate(new_len);
|
||||
|
||||
// If we removed all offsets, set to None
|
||||
if offsets.is_empty() {
|
||||
self.changeset_offsets = None;
|
||||
}
|
||||
self.changeset_offsets_len =
|
||||
self.changeset_offsets_len.saturating_sub(blocks_to_remove);
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -534,28 +522,24 @@ impl SegmentHeader {
|
||||
}
|
||||
}
|
||||
|
||||
/// Synchronizes changeset offsets with the current block range for account changeset segments.
|
||||
/// Synchronizes changeset offsets length with the current block range for changeset segments.
|
||||
///
|
||||
/// This should be called after modifying the block range when dealing with changeset segments
|
||||
/// to ensure the offsets vector matches the block range size.
|
||||
pub fn sync_changeset_offsets(&mut self) {
|
||||
/// to ensure the offsets length matches the block range size.
|
||||
/// The caller must also truncate the sidecar file accordingly.
|
||||
pub const fn sync_changeset_offsets(&mut self) {
|
||||
if !self.segment.is_change_based() {
|
||||
return;
|
||||
}
|
||||
|
||||
if let Some(block_range) = &self.block_range {
|
||||
if let Some(offsets) = &mut self.changeset_offsets {
|
||||
let expected_len = (block_range.end - block_range.start + 1) as usize;
|
||||
if offsets.len() > expected_len {
|
||||
offsets.truncate(expected_len);
|
||||
if offsets.is_empty() {
|
||||
self.changeset_offsets = None;
|
||||
}
|
||||
}
|
||||
let expected_len = block_range.end - block_range.start + 1;
|
||||
if self.changeset_offsets_len > expected_len {
|
||||
self.changeset_offsets_len = expected_len;
|
||||
}
|
||||
} else {
|
||||
// No block range means no offsets
|
||||
self.changeset_offsets = None;
|
||||
self.changeset_offsets_len = 0;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -581,21 +565,22 @@ impl SegmentHeader {
|
||||
self.tx_start()
|
||||
}
|
||||
|
||||
/// Returns the `ChangesetOffset` corresponding for the given block, if it's in the block
|
||||
/// range.
|
||||
/// Returns the index into the sidecar file for a given block's changeset offset.
|
||||
///
|
||||
/// If it is not in the block range or the changeset list in the header does not contain a
|
||||
/// value for the block, this returns `None`.
|
||||
pub fn changeset_offset(&self, block: BlockNumber) -> Option<&ChangesetOffset> {
|
||||
/// Returns `None` if the block is not in the block range.
|
||||
/// The caller must read the actual offset from the sidecar file at this index.
|
||||
pub fn changeset_offset_index(&self, block: BlockNumber) -> Option<u64> {
|
||||
let block_range = self.block_range()?;
|
||||
if !block_range.contains(block) {
|
||||
return None
|
||||
return None;
|
||||
}
|
||||
|
||||
let offsets = self.changeset_offsets.as_ref()?;
|
||||
let index = (block - block_range.start()) as usize;
|
||||
let index = block - block_range.start();
|
||||
if index >= self.changeset_offsets_len {
|
||||
return None;
|
||||
}
|
||||
|
||||
offsets.get(index)
|
||||
Some(index)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -754,42 +739,42 @@ mod tests {
|
||||
block_range: Some(SegmentRangeInclusive::new(0, 100)),
|
||||
tx_range: None,
|
||||
segment: StaticFileSegment::Headers,
|
||||
changeset_offsets: None,
|
||||
changeset_offsets_len: 0,
|
||||
},
|
||||
SegmentHeader {
|
||||
expected_block_range: SegmentRangeInclusive::new(0, 200),
|
||||
block_range: None,
|
||||
tx_range: Some(SegmentRangeInclusive::new(0, 300)),
|
||||
segment: StaticFileSegment::Transactions,
|
||||
changeset_offsets: None,
|
||||
changeset_offsets_len: 0,
|
||||
},
|
||||
SegmentHeader {
|
||||
expected_block_range: SegmentRangeInclusive::new(0, 200),
|
||||
block_range: Some(SegmentRangeInclusive::new(0, 100)),
|
||||
tx_range: Some(SegmentRangeInclusive::new(0, 300)),
|
||||
segment: StaticFileSegment::Receipts,
|
||||
changeset_offsets: None,
|
||||
changeset_offsets_len: 0,
|
||||
},
|
||||
SegmentHeader {
|
||||
expected_block_range: SegmentRangeInclusive::new(0, 200),
|
||||
block_range: Some(SegmentRangeInclusive::new(0, 100)),
|
||||
tx_range: Some(SegmentRangeInclusive::new(0, 300)),
|
||||
segment: StaticFileSegment::TransactionSenders,
|
||||
changeset_offsets: None,
|
||||
changeset_offsets_len: 0,
|
||||
},
|
||||
SegmentHeader {
|
||||
expected_block_range: SegmentRangeInclusive::new(0, 200),
|
||||
block_range: Some(SegmentRangeInclusive::new(0, 100)),
|
||||
tx_range: Some(SegmentRangeInclusive::new(0, 300)),
|
||||
segment: StaticFileSegment::AccountChangeSets,
|
||||
changeset_offsets: Some(vec![ChangesetOffset { offset: 1, num_changes: 1 }; 100]),
|
||||
changeset_offsets_len: 100,
|
||||
},
|
||||
SegmentHeader {
|
||||
expected_block_range: SegmentRangeInclusive::new(0, 200),
|
||||
block_range: Some(SegmentRangeInclusive::new(0, 100)),
|
||||
tx_range: None,
|
||||
segment: StaticFileSegment::StorageChangeSets,
|
||||
changeset_offsets: Some(vec![ChangesetOffset { offset: 1, num_changes: 1 }; 100]),
|
||||
changeset_offsets_len: 100,
|
||||
},
|
||||
];
|
||||
// Check that we test all segments
|
||||
|
||||
@@ -2,4 +2,4 @@
|
||||
source: crates/static-file/types/src/segment.rs
|
||||
expression: "Bytes::from(serialized)"
|
||||
---
|
||||
0x01000000000000000000000000000000c80000000000000001000000000000000064000000000000000100000000000000002c01000000000000040000000164000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000000000000000000000000000000000000
|
||||
0x01000000000000000000000000000000c80000000000000001000000000000000064000000000000000100000000000000002c0100000000000004000000640000000000000001000000000000000000000000000000000000000000000000
|
||||
|
||||
@@ -2,4 +2,4 @@
|
||||
source: crates/static-file/types/src/segment.rs
|
||||
expression: "Bytes::from(serialized)"
|
||||
---
|
||||
0x01000000000000000000000000000000c800000000000000010000000000000000640000000000000000050000000164000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000000000000000000000000000000000000
|
||||
0x01000000000000000000000000000000c80000000000000001000000000000000064000000000000000005000000640000000000000001000000000000000000000000000000000000000000000000
|
||||
|
||||
@@ -59,6 +59,8 @@ const INDEX_FILE_EXTENSION: &str = "idx";
|
||||
const OFFSETS_FILE_EXTENSION: &str = "off";
|
||||
/// The file extension used for configuration files.
|
||||
pub const CONFIG_FILE_EXTENSION: &str = "conf";
|
||||
/// The file extension used for changeset offset sidecar files.
|
||||
pub const CHANGESET_OFFSETS_FILE_EXTENSION: &str = "csoff";
|
||||
|
||||
/// A [`RefRow`] is a list of column value slices pointing to either an internal buffer or a
|
||||
/// memory-mapped file.
|
||||
@@ -240,13 +242,22 @@ impl<H: NippyJarHeader> NippyJar<H> {
|
||||
self.path.with_extension(CONFIG_FILE_EXTENSION)
|
||||
}
|
||||
|
||||
/// Returns the path for the changeset offsets sidecar file.
|
||||
pub fn changeset_offsets_path(&self) -> PathBuf {
|
||||
self.path.with_extension(CHANGESET_OFFSETS_FILE_EXTENSION)
|
||||
}
|
||||
|
||||
/// Deletes from disk this [`NippyJar`] alongside every satellite file.
|
||||
pub fn delete(self) -> Result<(), NippyJarError> {
|
||||
// TODO(joshie): ensure consistency on unexpected shutdown
|
||||
|
||||
for path in
|
||||
[self.data_path().into(), self.index_path(), self.offsets_path(), self.config_path()]
|
||||
{
|
||||
for path in [
|
||||
self.data_path().into(),
|
||||
self.index_path(),
|
||||
self.offsets_path(),
|
||||
self.config_path(),
|
||||
self.changeset_offsets_path(),
|
||||
] {
|
||||
if path.exists() {
|
||||
debug!(target: "nippy-jar", ?path, "Removing file.");
|
||||
reth_fs_util::remove_file(path)?;
|
||||
|
||||
@@ -17,6 +17,7 @@ use reth_db::static_file::{
|
||||
use reth_db_api::table::{Decompress, Value};
|
||||
use reth_node_types::NodePrimitives;
|
||||
use reth_primitives_traits::{SealedHeader, SignedTransaction};
|
||||
use reth_static_file_types::{ChangesetOffset, ChangesetOffsetReader};
|
||||
use reth_storage_api::range_size_hint;
|
||||
use reth_storage_errors::provider::{ProviderError, ProviderResult};
|
||||
use std::{
|
||||
@@ -90,6 +91,61 @@ impl<'a, N: NodePrimitives> StaticFileJarProvider<'a, N> {
|
||||
pub fn size(&self) -> usize {
|
||||
self.jar.value().size()
|
||||
}
|
||||
|
||||
/// Reads a changeset offset from the sidecar file for a given block.
|
||||
///
|
||||
/// Returns `None` if:
|
||||
/// - The segment is not change-based
|
||||
/// - The block is not in the block range
|
||||
/// - The sidecar file doesn't exist
|
||||
pub fn read_changeset_offset(
|
||||
&self,
|
||||
block_number: BlockNumber,
|
||||
) -> ProviderResult<Option<ChangesetOffset>> {
|
||||
let header = self.user_header();
|
||||
if !header.segment().is_change_based() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let Some(index) = header.changeset_offset_index(block_number) else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let csoff_path = self.data_path().with_extension("csoff");
|
||||
if !csoff_path.exists() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let mut reader = ChangesetOffsetReader::new(&csoff_path).map_err(ProviderError::other)?;
|
||||
reader.get(index).map_err(ProviderError::other)
|
||||
}
|
||||
|
||||
/// Reads all changeset offsets from the sidecar file.
|
||||
///
|
||||
/// Returns `None` if:
|
||||
/// - The segment is not change-based
|
||||
/// - The sidecar file doesn't exist
|
||||
pub fn read_changeset_offsets(&self) -> ProviderResult<Option<Vec<ChangesetOffset>>> {
|
||||
let header = self.user_header();
|
||||
if !header.segment().is_change_based() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let len = header.changeset_offsets_len();
|
||||
if len == 0 {
|
||||
return Ok(Some(Vec::new()));
|
||||
}
|
||||
|
||||
let csoff_path = self.data_path().with_extension("csoff");
|
||||
if !csoff_path.exists() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let mut reader =
|
||||
ChangesetOffsetReader::with_len(&csoff_path, len).map_err(ProviderError::other)?;
|
||||
let offsets = reader.get_range(0, len).map_err(ProviderError::other)?;
|
||||
Ok(Some(offsets))
|
||||
}
|
||||
}
|
||||
|
||||
impl<N: NodePrimitives<BlockHeader: Value>> HeaderProvider for StaticFileJarProvider<'_, N> {
|
||||
|
||||
@@ -39,8 +39,8 @@ use reth_primitives_traits::{
|
||||
};
|
||||
use reth_stages_types::{PipelineTarget, StageId};
|
||||
use reth_static_file_types::{
|
||||
find_fixed_range, HighestStaticFiles, SegmentHeader, SegmentRangeInclusive, StaticFileMap,
|
||||
StaticFileSegment, DEFAULT_BLOCKS_PER_STATIC_FILE,
|
||||
find_fixed_range, ChangesetOffsetReader, HighestStaticFiles, SegmentHeader,
|
||||
SegmentRangeInclusive, StaticFileMap, StaticFileSegment, DEFAULT_BLOCKS_PER_STATIC_FILE,
|
||||
};
|
||||
use reth_storage_api::{
|
||||
BlockBodyIndicesProvider, ChangeSetReader, DBProvider, StorageChangeSetReader,
|
||||
@@ -961,10 +961,10 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
) -> ProviderResult<SegmentHeader> {
|
||||
let fixed_block_range = self.find_fixed_range(segment, block);
|
||||
let key = (fixed_block_range.end(), segment);
|
||||
let file = self.path.join(segment.filename(&fixed_block_range));
|
||||
let jar = if let Some((_, jar)) = self.map.remove(&key) {
|
||||
jar.jar
|
||||
} else {
|
||||
let file = self.path.join(segment.filename(&fixed_block_range));
|
||||
debug!(
|
||||
target: "provider::static_file",
|
||||
?file,
|
||||
@@ -976,6 +976,15 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
};
|
||||
|
||||
let header = jar.user_header().clone();
|
||||
|
||||
// Delete the sidecar file for changeset segments before deleting the main jar
|
||||
if segment.is_change_based() {
|
||||
let csoff_path = file.with_extension("csoff");
|
||||
if csoff_path.exists() {
|
||||
std::fs::remove_file(&csoff_path).map_err(ProviderError::other)?;
|
||||
}
|
||||
}
|
||||
|
||||
jar.delete().map_err(ProviderError::other)?;
|
||||
|
||||
// SAFETY: this is currently necessary to ensure that certain indexes like
|
||||
@@ -2295,7 +2304,7 @@ impl<N: NodePrimitives> ChangeSetReader for StaticFileProvider<N> {
|
||||
Err(err) => return Err(err),
|
||||
};
|
||||
|
||||
if let Some(offset) = provider.user_header().changeset_offset(block_number) {
|
||||
if let Some(offset) = provider.read_changeset_offset(block_number)? {
|
||||
let mut cursor = provider.cursor()?;
|
||||
let mut changeset = Vec::with_capacity(offset.num_changes() as usize);
|
||||
|
||||
@@ -2327,9 +2336,7 @@ impl<N: NodePrimitives> ChangeSetReader for StaticFileProvider<N> {
|
||||
Err(err) => return Err(err),
|
||||
};
|
||||
|
||||
let user_header = provider.user_header();
|
||||
|
||||
let Some(offset) = user_header.changeset_offset(block_number) else {
|
||||
let Some(offset) = provider.read_changeset_offset(block_number)? else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
@@ -2388,12 +2395,19 @@ impl<N: NodePrimitives> ChangeSetReader for StaticFileProvider<N> {
|
||||
fn account_changeset_count(&self) -> ProviderResult<usize> {
|
||||
let mut count = 0;
|
||||
|
||||
// iterate through static files and sum changeset metadata via each static file header
|
||||
let static_files = iter_static_files(&self.path).map_err(ProviderError::other)?;
|
||||
if let Some(changeset_segments) = static_files.get(StaticFileSegment::AccountChangeSets) {
|
||||
for (_, header) in changeset_segments {
|
||||
if let Some(changeset_offsets) = header.changeset_offsets() {
|
||||
for offset in changeset_offsets {
|
||||
for (block_range, header) in changeset_segments {
|
||||
let csoff_path = self
|
||||
.path
|
||||
.join(StaticFileSegment::AccountChangeSets.filename(block_range))
|
||||
.with_extension("csoff");
|
||||
if csoff_path.exists() {
|
||||
let len = header.changeset_offsets_len();
|
||||
let mut reader = ChangesetOffsetReader::with_len(&csoff_path, len)
|
||||
.map_err(ProviderError::other)?;
|
||||
let offsets = reader.get_range(0, len).map_err(ProviderError::other)?;
|
||||
for offset in offsets {
|
||||
count += offset.num_changes() as usize;
|
||||
}
|
||||
}
|
||||
@@ -2419,7 +2433,7 @@ impl<N: NodePrimitives> StorageChangeSetReader for StaticFileProvider<N> {
|
||||
Err(err) => return Err(err),
|
||||
};
|
||||
|
||||
if let Some(offset) = provider.user_header().changeset_offset(block_number) {
|
||||
if let Some(offset) = provider.read_changeset_offset(block_number)? {
|
||||
let mut cursor = provider.cursor()?;
|
||||
let mut changeset = Vec::with_capacity(offset.num_changes() as usize);
|
||||
|
||||
@@ -2452,8 +2466,7 @@ impl<N: NodePrimitives> StorageChangeSetReader for StaticFileProvider<N> {
|
||||
Err(err) => return Err(err),
|
||||
};
|
||||
|
||||
let user_header = provider.user_header();
|
||||
let Some(offset) = user_header.changeset_offset(block_number) else {
|
||||
let Some(offset) = provider.read_changeset_offset(block_number)? else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
@@ -2510,9 +2523,17 @@ impl<N: NodePrimitives> StorageChangeSetReader for StaticFileProvider<N> {
|
||||
|
||||
let static_files = iter_static_files(&self.path).map_err(ProviderError::other)?;
|
||||
if let Some(changeset_segments) = static_files.get(StaticFileSegment::StorageChangeSets) {
|
||||
for (_, header) in changeset_segments {
|
||||
if let Some(changeset_offsets) = header.changeset_offsets() {
|
||||
for offset in changeset_offsets {
|
||||
for (block_range, header) in changeset_segments {
|
||||
let csoff_path = self
|
||||
.path
|
||||
.join(StaticFileSegment::StorageChangeSets.filename(block_range))
|
||||
.with_extension("csoff");
|
||||
if csoff_path.exists() {
|
||||
let len = header.changeset_offsets_len();
|
||||
let mut reader = ChangesetOffsetReader::with_len(&csoff_path, len)
|
||||
.map_err(ProviderError::other)?;
|
||||
let offsets = reader.get_range(0, len).map_err(ProviderError::other)?;
|
||||
for offset in offsets {
|
||||
count += offset.num_changes() as usize;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -743,8 +743,9 @@ mod tests {
|
||||
.unwrap();
|
||||
|
||||
// Check that the segment header has changeset offsets
|
||||
assert!(provider.user_header().changeset_offsets().is_some());
|
||||
let offsets = provider.user_header().changeset_offsets().unwrap();
|
||||
let offsets = provider.read_changeset_offsets().unwrap();
|
||||
assert!(offsets.is_some());
|
||||
let offsets = offsets.unwrap();
|
||||
assert_eq!(offsets.len(), 10); // Should have 10 blocks worth of offsets
|
||||
|
||||
// Verify each block has the expected number of changes
|
||||
@@ -854,7 +855,8 @@ mod tests {
|
||||
let (static_dir, _) = create_test_static_files_dir();
|
||||
|
||||
let blocks_per_file = 10;
|
||||
let files_per_range = 3;
|
||||
// 3 main files (jar, dat, idx) + 1 csoff sidecar file for changeset segments
|
||||
let files_per_range = 4;
|
||||
let file_set_count = 3;
|
||||
let initial_file_count = files_per_range * file_set_count;
|
||||
let tip = blocks_per_file * file_set_count - 1;
|
||||
@@ -923,7 +925,7 @@ mod tests {
|
||||
)?;
|
||||
|
||||
// Check offsets are valid
|
||||
let offsets = provider.user_header().changeset_offsets();
|
||||
let offsets = provider.read_changeset_offsets().unwrap();
|
||||
assert!(offsets.is_some(), "Should have changeset offsets");
|
||||
}
|
||||
|
||||
@@ -1087,8 +1089,9 @@ mod tests {
|
||||
.unwrap();
|
||||
|
||||
// Check that the segment header has changeset offsets
|
||||
assert!(provider.user_header().changeset_offsets().is_some());
|
||||
let offsets = provider.user_header().changeset_offsets().unwrap();
|
||||
let offsets = provider.read_changeset_offsets().unwrap();
|
||||
assert!(offsets.is_some());
|
||||
let offsets = offsets.unwrap();
|
||||
assert_eq!(offsets.len(), 10); // Should have 10 blocks worth of offsets
|
||||
|
||||
// Verify each block has the expected number of changes
|
||||
@@ -1189,7 +1192,8 @@ mod tests {
|
||||
let (static_dir, _) = create_test_static_files_dir();
|
||||
|
||||
let blocks_per_file = 10;
|
||||
let files_per_range = 3;
|
||||
// 3 main files (jar, dat, idx) + 1 csoff sidecar file for changeset segments
|
||||
let files_per_range = 4;
|
||||
let file_set_count = 3;
|
||||
let initial_file_count = files_per_range * file_set_count;
|
||||
let tip = blocks_per_file * file_set_count - 1;
|
||||
@@ -1248,7 +1252,7 @@ mod tests {
|
||||
tip,
|
||||
None,
|
||||
)?;
|
||||
let offsets = provider.user_header().changeset_offsets();
|
||||
let offsets = provider.read_changeset_offsets()?;
|
||||
assert!(offsets.is_some(), "Should have changeset offsets");
|
||||
}
|
||||
|
||||
@@ -1351,4 +1355,42 @@ mod tests {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_last_block_flushed_on_commit() {
|
||||
let (static_dir, _) = create_test_static_files_dir();
|
||||
|
||||
let sf_rw = StaticFileProvider::<EthPrimitives>::read_write(&static_dir)
|
||||
.expect("Failed to create static file provider");
|
||||
|
||||
let address = Address::from([5u8; 20]);
|
||||
let key = B256::with_last_byte(1);
|
||||
|
||||
// Write changes for a single block without calling increment_block explicitly
|
||||
// (append_storage_changeset calls it internally), then commit
|
||||
{
|
||||
let mut writer = sf_rw.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
|
||||
|
||||
// Append a single block's changeset (block 0)
|
||||
writer
|
||||
.append_storage_changeset(
|
||||
vec![StorageBeforeTx { address, key, value: U256::from(42) }],
|
||||
0,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// Commit without any subsequent block - the current block's offset should be flushed
|
||||
writer.commit().unwrap();
|
||||
}
|
||||
|
||||
// Verify highest block is 0
|
||||
let highest = sf_rw.get_highest_static_file_block(StaticFileSegment::StorageChangeSets);
|
||||
assert_eq!(highest, Some(0), "Should have block 0 after commit");
|
||||
|
||||
// Verify the data is actually readable via the high-level API
|
||||
let result = sf_rw.get_storage_before_block(0, address, key).unwrap();
|
||||
assert!(result.is_some(), "Should be able to read the changeset entry");
|
||||
let entry = result.unwrap();
|
||||
assert_eq!(entry.value, U256::from(42));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,7 +10,10 @@ use reth_db::models::{AccountBeforeTx, StorageBeforeTx};
|
||||
use reth_db_api::models::CompactU256;
|
||||
use reth_nippy_jar::{NippyJar, NippyJarError, NippyJarWriter};
|
||||
use reth_node_types::NodePrimitives;
|
||||
use reth_static_file_types::{SegmentHeader, SegmentRangeInclusive, StaticFileSegment};
|
||||
use reth_static_file_types::{
|
||||
ChangesetOffset, ChangesetOffsetReader, ChangesetOffsetWriter, SegmentHeader,
|
||||
SegmentRangeInclusive, StaticFileSegment,
|
||||
};
|
||||
use reth_storage_errors::provider::{ProviderError, ProviderResult, StaticFileWriterError};
|
||||
use std::{
|
||||
borrow::Borrow,
|
||||
@@ -219,6 +222,10 @@ pub struct StaticFileProviderRW<N> {
|
||||
prune_on_commit: Option<PruneStrategy>,
|
||||
/// Whether `sync_all()` has been called. Used by `finalize()` to avoid redundant syncs.
|
||||
synced: bool,
|
||||
/// Changeset offsets sidecar writer (only for changeset segments).
|
||||
changeset_offsets: Option<ChangesetOffsetWriter>,
|
||||
/// Current block's changeset offset being written.
|
||||
current_changeset_offset: Option<ChangesetOffset>,
|
||||
}
|
||||
|
||||
impl<N: NodePrimitives> StaticFileProviderRW<N> {
|
||||
@@ -233,6 +240,8 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
|
||||
metrics: Option<Arc<StaticFileProviderMetrics>>,
|
||||
) -> ProviderResult<Self> {
|
||||
let (writer, data_path) = Self::open(segment, block, reader.clone(), metrics.clone())?;
|
||||
|
||||
// Create writer WITHOUT sidecar first - we'll add it after healing
|
||||
let mut writer = Self {
|
||||
writer,
|
||||
data_path,
|
||||
@@ -241,10 +250,19 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
|
||||
metrics,
|
||||
prune_on_commit: None,
|
||||
synced: false,
|
||||
changeset_offsets: None,
|
||||
current_changeset_offset: None,
|
||||
};
|
||||
|
||||
// Run NippyJar healing BEFORE setting up changeset sidecar
|
||||
// This may reduce rows, which affects valid sidecar offsets
|
||||
writer.ensure_end_range_consistency()?;
|
||||
|
||||
// Now set up changeset sidecar with post-heal header values
|
||||
if segment.is_change_based() {
|
||||
writer.heal_changeset_sidecar()?;
|
||||
}
|
||||
|
||||
Ok(writer)
|
||||
}
|
||||
|
||||
@@ -336,7 +354,112 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
|
||||
self.prune_on_commit.is_some()
|
||||
}
|
||||
|
||||
/// Syncs all data (rows and offsets) to disk.
|
||||
/// Heals the changeset offset sidecar after NippyJar healing.
|
||||
///
|
||||
/// This must be called AFTER `ensure_end_range_consistency()` which may reduce rows.
|
||||
/// If offsets point past the current row count, we truncate the sidecar to match.
|
||||
fn heal_changeset_sidecar(&mut self) -> ProviderResult<()> {
|
||||
let csoff_path = self.data_path.with_extension("csoff");
|
||||
let committed_len = self.writer.user_header().changeset_offsets_len();
|
||||
let actual_rows = self.writer.rows() as u64;
|
||||
|
||||
// Open the sidecar writer (this handles partial records and uncommitted tail)
|
||||
let mut csoff_writer =
|
||||
ChangesetOffsetWriter::new(&csoff_path, committed_len).map_err(ProviderError::other)?;
|
||||
|
||||
// If we have offsets, validate they don't exceed actual rows
|
||||
if committed_len > 0 {
|
||||
// Edge case: if actual_rows is 0 but we have offsets, all offsets are invalid
|
||||
if actual_rows == 0 {
|
||||
tracing::warn!(
|
||||
target: "reth::static_file",
|
||||
path = %csoff_path.display(),
|
||||
committed_len,
|
||||
"Truncating all changeset offsets - data file is empty"
|
||||
);
|
||||
csoff_writer.truncate(0).map_err(ProviderError::other)?;
|
||||
self.writer.user_header_mut().set_changeset_offsets_len(0);
|
||||
self.writer.user_header_mut().prune(committed_len);
|
||||
self.writer.commit().map_err(ProviderError::other)?;
|
||||
self.changeset_offsets = Some(csoff_writer);
|
||||
return Ok(());
|
||||
}
|
||||
// Use with_len to respect committed length (writer already healed file to this length)
|
||||
let mut reader =
|
||||
ChangesetOffsetReader::with_len(&csoff_path, committed_len)
|
||||
.map_err(ProviderError::other)?;
|
||||
|
||||
if let Some(last_offset) =
|
||||
reader.get(committed_len - 1).map_err(ProviderError::other)?
|
||||
{
|
||||
let end_row = last_offset.offset() + last_offset.num_changes();
|
||||
|
||||
if end_row > actual_rows {
|
||||
// Sidecar has offsets pointing past EOF - need to truncate blocks
|
||||
// Find the last valid block (where offset + num_changes <= actual_rows)
|
||||
let mut valid_blocks = 0u64;
|
||||
for i in 0..committed_len {
|
||||
if let Some(offset) = reader.get(i).map_err(ProviderError::other)? {
|
||||
if offset.offset() + offset.num_changes() <= actual_rows {
|
||||
valid_blocks = i + 1;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tracing::warn!(
|
||||
target: "reth::static_file",
|
||||
path = %csoff_path.display(),
|
||||
committed_len,
|
||||
actual_rows,
|
||||
valid_blocks,
|
||||
"Truncating changeset offsets that point past healed data"
|
||||
);
|
||||
|
||||
// Truncate sidecar to valid blocks
|
||||
csoff_writer.truncate(valid_blocks).map_err(ProviderError::other)?;
|
||||
|
||||
// Update header to match
|
||||
self.writer.user_header_mut().set_changeset_offsets_len(valid_blocks);
|
||||
|
||||
// Also need to update block range if we removed blocks
|
||||
if valid_blocks < committed_len {
|
||||
let blocks_removed = committed_len - valid_blocks;
|
||||
self.writer.user_header_mut().prune(blocks_removed);
|
||||
}
|
||||
|
||||
// Commit the healed state
|
||||
self.writer.commit().map_err(ProviderError::other)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.changeset_offsets = Some(csoff_writer);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Flushes the current changeset offset (if any) to the `.csoff` sidecar file.
|
||||
///
|
||||
/// This is idempotent - safe to call multiple times. After flushing, the current offset
|
||||
/// is cleared to prevent duplicate writes.
|
||||
///
|
||||
/// This must be called before committing or syncing to ensure the last block's offset
|
||||
/// is persisted, since `increment_block()` only writes the *previous* block's offset.
|
||||
fn flush_current_changeset_offset(&mut self) -> ProviderResult<()> {
|
||||
if !self.writer.user_header().segment().is_change_based() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if let Some(offset) = self.current_changeset_offset.take() &&
|
||||
let Some(writer) = &mut self.changeset_offsets
|
||||
{
|
||||
writer.append(&offset).map_err(ProviderError::other)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Syncs all data (rows, offsets, and changeset offsets sidecar) to disk.
|
||||
///
|
||||
/// This does NOT commit the configuration. Call [`Self::finalize`] after to write the
|
||||
/// configuration and mark the writer as clean.
|
||||
@@ -346,6 +469,15 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
|
||||
if self.prune_on_commit.is_some() {
|
||||
return Err(StaticFileWriterError::FinalizeWithPruneQueued.into());
|
||||
}
|
||||
|
||||
// Write the final block's offset and sync the sidecar for changeset segments
|
||||
self.flush_current_changeset_offset()?;
|
||||
if let Some(writer) = &mut self.changeset_offsets {
|
||||
writer.sync().map_err(ProviderError::other)?;
|
||||
// Update the header with the actual number of offsets written
|
||||
self.writer.user_header_mut().set_changeset_offsets_len(writer.len());
|
||||
}
|
||||
|
||||
if self.writer.is_dirty() {
|
||||
self.writer.sync_all().map_err(ProviderError::other)?;
|
||||
}
|
||||
@@ -364,7 +496,9 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
|
||||
}
|
||||
if self.writer.is_dirty() {
|
||||
if !self.synced {
|
||||
self.writer.sync_all().map_err(ProviderError::other)?;
|
||||
// Must call self.sync_all() to flush changeset offsets and update
|
||||
// the header's changeset_offsets_len, not just the inner writer
|
||||
self.sync_all()?;
|
||||
}
|
||||
|
||||
self.writer.finalize().map_err(ProviderError::other)?;
|
||||
@@ -405,6 +539,15 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
|
||||
}
|
||||
}
|
||||
|
||||
// For changeset segments, flush and sync the sidecar file before committing the main file.
|
||||
// This ensures crash consistency: the sidecar is durable before the header references it.
|
||||
self.flush_current_changeset_offset()?;
|
||||
if let Some(writer) = &mut self.changeset_offsets {
|
||||
writer.sync().map_err(ProviderError::other)?;
|
||||
// Update the header with the actual number of offsets written
|
||||
self.writer.user_header_mut().set_changeset_offsets_len(writer.len());
|
||||
}
|
||||
|
||||
if self.writer.is_dirty() {
|
||||
debug!(
|
||||
target: "provider::static_file",
|
||||
@@ -548,7 +691,16 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
|
||||
let (writer, data_path) =
|
||||
Self::open(segment, last_block + 1, self.reader.clone(), self.metrics.clone())?;
|
||||
self.writer = writer;
|
||||
self.data_path = data_path;
|
||||
self.data_path = data_path.clone();
|
||||
|
||||
// Update changeset offsets writer for the new file (starts empty)
|
||||
if segment.is_change_based() {
|
||||
let csoff_path = data_path.with_extension("csoff");
|
||||
self.changeset_offsets = Some(
|
||||
ChangesetOffsetWriter::new(&csoff_path, 0)
|
||||
.map_err(ProviderError::other)?,
|
||||
);
|
||||
}
|
||||
|
||||
*self.writer.user_header_mut() = SegmentHeader::new(
|
||||
self.reader().find_fixed_range(segment, last_block + 1),
|
||||
@@ -560,6 +712,20 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
|
||||
}
|
||||
|
||||
self.writer.user_header_mut().increment_block();
|
||||
|
||||
// Handle changeset offset tracking for changeset segments
|
||||
if segment.is_change_based() {
|
||||
// Write previous block's offset if we have one
|
||||
if let Some(offset) = self.current_changeset_offset.take() &&
|
||||
let Some(writer) = &mut self.changeset_offsets
|
||||
{
|
||||
writer.append(&offset).map_err(ProviderError::other)?;
|
||||
}
|
||||
// Start tracking new block's offset
|
||||
let new_offset = self.writer.rows() as u64;
|
||||
self.current_changeset_offset = Some(ChangesetOffset::new(new_offset, 0));
|
||||
}
|
||||
|
||||
if let Some(metrics) = &self.metrics {
|
||||
metrics.record_segment_operation(
|
||||
segment,
|
||||
@@ -631,13 +797,6 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
|
||||
expected_block_start = self.writer.user_header().expected_block_start();
|
||||
}
|
||||
|
||||
// Now we're in the correct file, we need to find how many rows to prune
|
||||
// We need to iterate through the changesets to find the correct position
|
||||
// Since changesets are stored per block, we need to find the offset for the block
|
||||
let changeset_offsets = self.writer.user_header().changeset_offsets().ok_or_else(|| {
|
||||
ProviderError::other(StaticFileWriterError::new("Missing changeset offsets"))
|
||||
})?;
|
||||
|
||||
// Find the number of rows to keep (up to and including last_block)
|
||||
let blocks_to_keep = if last_block >= expected_block_start {
|
||||
last_block - expected_block_start + 1
|
||||
@@ -645,18 +804,27 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
|
||||
0
|
||||
};
|
||||
|
||||
// Read changeset offsets from sidecar file to find where to truncate
|
||||
let csoff_path = self.data_path.with_extension("csoff");
|
||||
let changeset_offsets_len = self.writer.user_header().changeset_offsets_len();
|
||||
|
||||
let rows_to_keep = if blocks_to_keep == 0 {
|
||||
0
|
||||
} else if blocks_to_keep as usize > changeset_offsets.len() {
|
||||
// Keep all rows in this file (shouldn't happen if data is consistent)
|
||||
self.writer.rows() as u64
|
||||
} else if blocks_to_keep as usize == changeset_offsets.len() {
|
||||
// Keep all rows
|
||||
} else if blocks_to_keep >= changeset_offsets_len {
|
||||
// Keep all rows in this file
|
||||
self.writer.rows() as u64
|
||||
} else {
|
||||
// Find the offset for the block after last_block
|
||||
// This gives us the number of rows to keep
|
||||
changeset_offsets[blocks_to_keep as usize].offset()
|
||||
// Read offset for the block after last_block from sidecar
|
||||
// Use with_len to respect the committed length from header, not file length
|
||||
let mut reader =
|
||||
ChangesetOffsetReader::with_len(&csoff_path, changeset_offsets_len)
|
||||
.map_err(ProviderError::other)?;
|
||||
if let Some(next_offset) = reader.get(blocks_to_keep).map_err(ProviderError::other)? {
|
||||
next_offset.offset()
|
||||
} else {
|
||||
// If we can't read the offset, keep all rows
|
||||
self.writer.rows() as u64
|
||||
}
|
||||
};
|
||||
|
||||
let total_rows = self.writer.rows() as u64;
|
||||
@@ -684,6 +852,14 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
|
||||
// Sync changeset offsets to match the new block range
|
||||
self.writer.user_header_mut().sync_changeset_offsets();
|
||||
|
||||
// Truncate the sidecar file to match the new block count
|
||||
if let Some(writer) = &mut self.changeset_offsets {
|
||||
writer.truncate(blocks_to_keep).map_err(ProviderError::other)?;
|
||||
}
|
||||
|
||||
// Clear current changeset offset tracking since we've pruned
|
||||
self.current_changeset_offset = None;
|
||||
|
||||
// Commits new changes to disk
|
||||
self.commit()?;
|
||||
|
||||
@@ -764,16 +940,36 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
|
||||
/// Delete the current static file, and replace this provider writer with the previous static
|
||||
/// file.
|
||||
fn delete_current_and_open_previous(&mut self) -> Result<(), ProviderError> {
|
||||
let segment = self.user_header().segment();
|
||||
let current_path = self.data_path.clone();
|
||||
let (previous_writer, data_path) = Self::open(
|
||||
self.user_header().segment(),
|
||||
segment,
|
||||
self.writer.user_header().expected_block_start() - 1,
|
||||
self.reader.clone(),
|
||||
self.metrics.clone(),
|
||||
)?;
|
||||
self.writer = previous_writer;
|
||||
self.writer.set_dirty();
|
||||
self.data_path = data_path;
|
||||
self.data_path = data_path.clone();
|
||||
|
||||
// Delete the sidecar file for changeset segments before deleting the main jar
|
||||
if segment.is_change_based() {
|
||||
let csoff_path = current_path.with_extension("csoff");
|
||||
if csoff_path.exists() {
|
||||
std::fs::remove_file(&csoff_path).map_err(ProviderError::other)?;
|
||||
}
|
||||
// Re-initialize the changeset offsets writer for the previous file
|
||||
let new_csoff_path = data_path.with_extension("csoff");
|
||||
let committed_len = self.writer.user_header().changeset_offsets_len();
|
||||
self.changeset_offsets = Some(
|
||||
ChangesetOffsetWriter::new(&new_csoff_path, committed_len)
|
||||
.map_err(ProviderError::other)?,
|
||||
);
|
||||
}
|
||||
|
||||
// Clear current changeset offset tracking since we're switching files
|
||||
self.current_changeset_offset = None;
|
||||
|
||||
NippyJar::<SegmentHeader>::load(¤t_path)
|
||||
.map_err(ProviderError::other)?
|
||||
.delete()
|
||||
@@ -817,10 +1013,9 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
|
||||
|
||||
/// Appends change to changeset static file.
|
||||
fn append_change<V: Compact>(&mut self, change: &V) -> ProviderResult<()> {
|
||||
if self.writer.user_header().changeset_offsets().is_some() {
|
||||
self.writer.user_header_mut().increment_block_changes();
|
||||
if let Some(ref mut offset) = self.current_changeset_offset {
|
||||
offset.increment_num_changes();
|
||||
}
|
||||
|
||||
self.append_column(change)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user