From 6953971c2fb0243818a8a475f3b3773a4a1e1d90 Mon Sep 17 00:00:00 2001 From: Georgios Konstantopoulos Date: Fri, 6 Feb 2026 05:31:31 -0800 Subject: [PATCH] feat(static-file): incremental changeset offset storage (#21596) Co-authored-by: Amp Co-authored-by: Dan Cline <6798349+Rjected@users.noreply.github.com> Co-authored-by: joshieDo <93316087+joshieDo@users.noreply.github.com> --- .changelog/quiet-foxes-dance.md | 6 + Cargo.lock | 118 +-- crates/exex/exex/Cargo.toml | 1 + crates/static-file/static-file/Cargo.toml | 1 + crates/static-file/types/Cargo.toml | 4 + .../types/src/changeset_offsets.rs | 437 ++++++++++ crates/static-file/types/src/lib.rs | 9 +- crates/static-file/types/src/segment.rs | 160 ++-- ...es__segment__tests__AccountChangeSets.snap | 2 +- ...es__segment__tests__StorageChangeSets.snap | 2 +- crates/storage/nippy-jar/src/lib.rs | 17 +- crates/storage/provider/Cargo.toml | 4 +- .../provider/src/providers/static_file/jar.rs | 58 ++ .../src/providers/static_file/manager.rs | 55 +- .../provider/src/providers/static_file/mod.rs | 62 +- .../src/providers/static_file/writer.rs | 297 ++++++- .../src/providers/static_file/writer_tests.rs | 812 ++++++++++++++++++ 17 files changed, 1847 insertions(+), 198 deletions(-) create mode 100644 .changelog/quiet-foxes-dance.md create mode 100644 crates/static-file/types/src/changeset_offsets.rs create mode 100644 crates/storage/provider/src/providers/static_file/writer_tests.rs diff --git a/.changelog/quiet-foxes-dance.md b/.changelog/quiet-foxes-dance.md new file mode 100644 index 0000000000..6581ebb5e3 --- /dev/null +++ b/.changelog/quiet-foxes-dance.md @@ -0,0 +1,6 @@ +--- +reth-static-file-types: patch +reth-provider: patch +--- + +Move changeset offsets from segment header to external `.csoff` sidecar file for incremental writes and crash recovery. diff --git a/Cargo.lock b/Cargo.lock index c8bd29d516..3511c39dc3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -516,9 +516,9 @@ dependencies = [ [[package]] name = "alloy-rlp" -version = "0.3.12" +version = "0.3.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f70d83b765fdc080dbcd4f4db70d8d23fe4761f2f02ebfa9146b833900634b4" +checksum = "e93e50f64a77ad9c5470bf2ad0ca02f228da70c792a8f06634801e202579f35e" dependencies = [ "alloy-rlp-derive", "arrayvec", @@ -527,9 +527,9 @@ dependencies = [ [[package]] name = "alloy-rlp-derive" -version = "0.3.12" +version = "0.3.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64b728d511962dda67c1bc7ea7c03736ec275ed2cf4c35d9585298ac9ccf3b73" +checksum = "ce8849c74c9ca0f5a03da1c865e3eb6f768df816e67dd3721a398a8a7e398011" dependencies = [ "proc-macro2", "quote", @@ -1001,7 +1001,7 @@ version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc" dependencies = [ - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -1012,7 +1012,7 @@ checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d" dependencies = [ "anstyle", "once_cell_polyfill", - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -2175,9 +2175,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.56" +version = "4.5.57" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a75ca66430e33a14957acc24c5077b503e7d374151b2b4b3a10c83b4ceb4be0e" +checksum = "6899ea499e3fb9305a65d5ebf6e3d2248c5fab291f300ad0a704fbe142eae31a" dependencies = [ "clap_builder", "clap_derive", @@ -2185,9 +2185,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.56" +version = "4.5.57" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "793207c7fa6300a0608d1080b858e5fdbe713cdc1c8db9fb17777d8a13e63df0" +checksum = "7b12c8b680195a62a8364d16b8447b01b6c2c8f9aaf68bee653be34d4245e238" dependencies = [ "anstream", "anstyle", @@ -3093,7 +3093,7 @@ dependencies = [ "libc", "option-ext", "redox_users 0.5.2", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -3423,7 +3423,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -4006,9 +4006,9 @@ checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99" [[package]] name = "flate2" -version = "1.1.8" +version = "1.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b375d6465b98090a5f25b1c7703f3859783755aa9a80433b36e0379a3ec2f369" +checksum = "843fba2746e448b37e26a819579957415c8cef339bf08564fe8b7ddbd959573c" dependencies = [ "crc32fast", "miniz_oxide", @@ -4693,7 +4693,7 @@ dependencies = [ "tokio", "tokio-rustls", "tower-service", - "webpki-roots 1.0.5", + "webpki-roots 1.0.6", ] [[package]] @@ -4711,14 +4711,13 @@ dependencies = [ [[package]] name = "hyper-util" -version = "0.1.19" +version = "0.1.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "727805d60e7938b76b826a6ef209eb70eaa1812794f9424d4a4e2d740662df5f" +checksum = "96547c2556ec9d12fb1578c4eaf448b04993e7fb79cbaad930a656880a6bdfa0" dependencies = [ "base64 0.22.1", "bytes", "futures-channel", - "futures-core", "futures-util", "http", "http-body", @@ -4999,9 +4998,9 @@ dependencies = [ [[package]] name = "insta" -version = "1.46.2" +version = "1.46.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38c91d64f9ad425e80200a50a0e8b8a641680b44e33ce832efe5b8bc65161b07" +checksum = "e82db8c87c7f1ccecb34ce0c24399b8a73081427f3c7c50a5d597925356115e4" dependencies = [ "console", "once_cell", @@ -5033,9 +5032,9 @@ dependencies = [ [[package]] name = "interprocess" -version = "2.2.3" +version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d941b405bd2322993887859a8ee6ac9134945a24ec5ec763a8a962fc64dfec2d" +checksum = "7b00d05442c2106c75b7410f820b152f61ec0edc7befcb9b381b673a20314753" dependencies = [ "doctest-file", "futures-core", @@ -5091,7 +5090,7 @@ checksum = "3640c1c38b8e4e43584d8df18be5fc6b0aa314ce6ebf51b53313d4306cca8e46" dependencies = [ "hermit-abi", "libc", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -5683,6 +5682,12 @@ dependencies = [ "libc", ] +[[package]] +name = "mach2" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dae608c151f68243f2b000364e1f7b186d9c29845f7d2d85bd31b9ad77ad552b" + [[package]] name = "macro-string" version = "0.1.4" @@ -5788,13 +5793,13 @@ dependencies = [ [[package]] name = "metrics-process" -version = "2.4.2" +version = "2.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f615e08e049bd14a44c4425415782efb9bcd479fc1e19ddeb971509074c060d0" +checksum = "4268d87f64a752f5a651314fc683f04da10be65701ea3e721ba4d74f79163cac" dependencies = [ "libc", "libproc", - "mach2", + "mach2 0.6.0", "metrics", "once_cell", "procfs", @@ -6045,7 +6050,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -7406,9 +7411,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.12.2" +version = "1.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "843bc0191f75f3e22651ae5f1e72939ab2f72a4bc30fa80a066bd66edefc24d4" +checksum = "e10754a14b9137dd7b1e3e5b0493cc9171fdd105e0ab477f51b72e7f3ac0e276" dependencies = [ "aho-corasick", "memchr", @@ -7418,9 +7423,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.4.13" +version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5276caf25ac86c8d810222b3dbb938e512c55c6831a10f3e6ed1c93b84041f1c" +checksum = "6e1dd4122fc1595e8162618945476892eefca7b88c52820e74af6262213cae8f" dependencies = [ "aho-corasick", "memchr", @@ -7429,9 +7434,9 @@ dependencies = [ [[package]] name = "regex-syntax" -version = "0.8.8" +version = "0.8.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a2d987857b319362043e95f5353c0535c1f58eec5336fdfcf626430af7def58" +checksum = "a96887878f22d7bad8a3b6dc5b7440e0ada9a245242924394987b21cf2210a4c" [[package]] name = "regress" @@ -7489,7 +7494,7 @@ dependencies = [ "wasm-bindgen-futures", "wasm-streams", "web-sys", - "webpki-roots 1.0.5", + "webpki-roots 1.0.6", ] [[package]] @@ -9639,6 +9644,7 @@ dependencies = [ "reth-ethereum-engine-primitives", "reth-ethereum-primitives", "reth-execution-types", + "reth-fs-util", "reth-metrics", "reth-nippy-jar", "reth-node-types", @@ -10347,6 +10353,8 @@ dependencies = [ "serde", "serde_json", "strum", + "tempfile", + "tracing", ] [[package]] @@ -10993,9 +11001,9 @@ dependencies = [ [[package]] name = "rlimit" -version = "0.10.2" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7043b63bd0cd1aaa628e476b80e6d4023a3b50eb32789f2728908107bd0c793a" +checksum = "f35ee2729c56bb610f6dba436bf78135f728b7373bdffae2ec815b2d3eb98cc3" dependencies = [ "libc", ] @@ -11186,7 +11194,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -11326,9 +11334,9 @@ dependencies = [ [[package]] name = "schemars" -version = "1.2.0" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54e910108742c57a770f492731f99be216a52fadd361b06c8fb59d74ccc267d2" +checksum = "a2b42f36aa1cd011945615b92222f6bf73c599a102a300334cd7f8dbeec726cc" dependencies = [ "dyn-clone", "ref-cast", @@ -11590,7 +11598,7 @@ dependencies = [ "indexmap 1.9.3", "indexmap 2.13.0", "schemars 0.9.0", - "schemars 1.2.0", + "schemars 1.2.1", "serde_core", "serde_json", "serde_with_macros", @@ -12035,7 +12043,7 @@ dependencies = [ "getrandom 0.3.4", "once_cell", "rustix", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -12656,7 +12664,7 @@ dependencies = [ "cfg-if", "itoa", "libc", - "mach2", + "mach2 0.5.0", "memmap2", "smallvec", "tracing-core", @@ -13221,14 +13229,14 @@ version = "0.26.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "75c7f0ef91146ebfb530314f5f1d24528d7f0767efbfd31dce919275413e393e" dependencies = [ - "webpki-root-certs 1.0.5", + "webpki-root-certs 1.0.6", ] [[package]] name = "webpki-root-certs" -version = "1.0.5" +version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "36a29fc0408b113f68cf32637857ab740edfafdf460c326cd2afaa2d84cc05dc" +checksum = "804f18a4ac2676ffb4e8b5b5fa9ae38af06df08162314f96a68d2a363e21a8ca" dependencies = [ "rustls-pki-types", ] @@ -13239,14 +13247,14 @@ version = "0.26.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "521bc38abb08001b01866da9f51eb7c5d647a19260e00054a8c7fd5f9e57f7a9" dependencies = [ - "webpki-roots 1.0.5", + "webpki-roots 1.0.6", ] [[package]] name = "webpki-roots" -version = "1.0.5" +version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12bed680863276c63889429bfd6cab3b99943659923822de1c8a39c49e4d722c" +checksum = "22cfaf3c063993ff62e73cb4311efde4db1efb31ab78a3e5c457939ad5cc0bed" dependencies = [ "rustls-pki-types", ] @@ -13279,7 +13287,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.61.2", ] [[package]] @@ -13807,18 +13815,18 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.8.37" +version = "0.8.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7456cf00f0685ad319c5b1693f291a650eaf345e941d082fc4e03df8a03996ac" +checksum = "57cf3aa6855b23711ee9852dfc97dfaa51c45feaba5b645d0c777414d494a961" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.8.37" +version = "0.8.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1328722bbf2115db7e19d69ebcc15e795719e2d66b60827c6a69a117365e37a0" +checksum = "8a616990af1a287837c4fe6596ad77ef57948f787e46ce28e166facc0cc1cb75" dependencies = [ "proc-macro2", "quote", @@ -13902,9 +13910,9 @@ dependencies = [ [[package]] name = "zmij" -version = "1.0.18" +version = "1.0.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1966f8ac2c1f76987d69a74d0e0f929241c10e78136434e3be70ff7f58f64214" +checksum = "3ff05f8caa9038894637571ae6b9e29466c1f4f829d26c9b28f869a29cbe3445" [[package]] name = "zstd" diff --git a/crates/exex/exex/Cargo.toml b/crates/exex/exex/Cargo.toml index 189cd50965..28712c576c 100644 --- a/crates/exex/exex/Cargo.toml +++ b/crates/exex/exex/Cargo.toml @@ -67,6 +67,7 @@ tempfile.workspace = true [features] default = [] +edge = ["reth-provider/edge"] serde = [ "reth-exex-types/serde", "reth-revm/serde", diff --git a/crates/static-file/static-file/Cargo.toml b/crates/static-file/static-file/Cargo.toml index 7ea23e0132..0154fa8be7 100644 --- a/crates/static-file/static-file/Cargo.toml +++ b/crates/static-file/static-file/Cargo.toml @@ -38,3 +38,4 @@ assert_matches.workspace = true tempfile.workspace = true [features] +edge = ["reth-stages/edge"] diff --git a/crates/static-file/types/Cargo.toml b/crates/static-file/types/Cargo.toml index 540869ef6a..863f197431 100644 --- a/crates/static-file/types/Cargo.toml +++ b/crates/static-file/types/Cargo.toml @@ -13,6 +13,7 @@ workspace = true [dependencies] alloy-primitives.workspace = true +tracing = { workspace = true, optional = true } clap = { workspace = true, features = ["derive"], optional = true } fixed-map.workspace = true @@ -25,6 +26,7 @@ strum = { workspace = true, features = ["derive"] } reth-nippy-jar.workspace = true serde_json.workspace = true insta.workspace = true +tempfile.workspace = true [features] default = ["std"] @@ -36,5 +38,7 @@ std = [ "strum/std", "serde_json/std", "fixed-map/std", + "dep:tracing", + "tracing?/std", ] clap = ["dep:clap"] diff --git a/crates/static-file/types/src/changeset_offsets.rs b/crates/static-file/types/src/changeset_offsets.rs new file mode 100644 index 0000000000..856a4f3533 --- /dev/null +++ b/crates/static-file/types/src/changeset_offsets.rs @@ -0,0 +1,437 @@ +//! Changeset offset sidecar file I/O. +//! +//! Provides append-only writing and O(1) random-access reading for changeset offsets. +//! The file format is fixed-width 16-byte records: `[offset: u64 LE][num_changes: u64 LE]`. + +use crate::ChangesetOffset; +use std::{ + fs::{File, OpenOptions}, + io::{self, Read, Seek, SeekFrom, Write}, + path::Path, +}; + +/// Writer for appending changeset offsets to a sidecar file. +#[derive(Debug)] +pub struct ChangesetOffsetWriter { + file: File, + /// Number of records written. + records_written: u64, +} + +impl ChangesetOffsetWriter { + /// Record size in bytes. + const RECORD_SIZE: usize = 16; + + /// Opens or creates the changeset offset file for appending. + /// + /// The file is healed to match `committed_len` (from the segment header): + /// - Partial records (from crash mid-write) are truncated to record boundary + /// - Extra complete records (from crash after sidecar sync but before header commit) are + /// truncated to match the committed length + /// - If the file has fewer records than committed, returns an error (data corruption) + /// + /// This mirrors `NippyJar`'s healing behavior where config/header is the commit boundary. + pub fn new(path: impl AsRef, committed_len: u64) -> io::Result { + let file = OpenOptions::new() + .create(true) + .truncate(false) + .read(true) + .write(true) + .open(path.as_ref())?; + + let file_len = file.metadata()?.len(); + let remainder = file_len % Self::RECORD_SIZE as u64; + + // First, truncate any partial record from crash mid-write + let aligned_len = if remainder != 0 { + let truncated_len = file_len - remainder; + tracing::warn!( + target: "reth::static_file", + path = %path.as_ref().display(), + original_len = file_len, + truncated_len, + "Truncating partial changeset offset record" + ); + file.set_len(truncated_len)?; + file.sync_all()?; // Sync required for crash safety + truncated_len + } else { + file_len + }; + + let records_in_file = aligned_len / Self::RECORD_SIZE as u64; + + // Heal sidecar to match committed header length + match records_in_file.cmp(&committed_len) { + std::cmp::Ordering::Greater => { + // Sidecar has uncommitted records from a crash - truncate them + let target_len = committed_len * Self::RECORD_SIZE as u64; + tracing::warn!( + target: "reth::static_file", + path = %path.as_ref().display(), + sidecar_records = records_in_file, + committed_len, + "Truncating uncommitted changeset offset records after crash recovery" + ); + file.set_len(target_len)?; + file.sync_all()?; // Sync required for crash safety + } + std::cmp::Ordering::Less => { + // INVARIANT VIOLATION: This should be impossible if healing ran correctly. + // + // All code paths call `heal_changeset_sidecar()` before this function, which + // validates the sidecar against NippyJar state and corrects the header to match + // the actual file size. Therefore, `committed_len` should always equal or exceed + // `records_in_file` when this function is called. + // + // If we reach this error, it indicates: + // - A bug in the healing logic (header not corrected properly) + // - This function was called directly without going through healing + // - External corruption occurred between healing and opening (extremely unlikely) + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!( + "INVARIANT VIOLATION: Changeset offset sidecar has {} records but header expects {} \ + (healing should have prevented this - possible bug in healing logic): {}", + records_in_file, + committed_len, + path.as_ref().display() + ), + )); + } + std::cmp::Ordering::Equal => {} + } + + let records_written = committed_len; + let file = OpenOptions::new().create(true).append(true).open(path)?; + + Ok(Self { file, records_written }) + } + + /// Appends a single changeset offset record. + pub fn append(&mut self, offset: &ChangesetOffset) -> io::Result<()> { + let mut buf = [0u8; Self::RECORD_SIZE]; + buf[..8].copy_from_slice(&offset.offset().to_le_bytes()); + buf[8..].copy_from_slice(&offset.num_changes().to_le_bytes()); + self.file.write_all(&buf)?; + self.records_written += 1; + Ok(()) + } + + /// Appends multiple changeset offset records. + pub fn append_many(&mut self, offsets: &[ChangesetOffset]) -> io::Result<()> { + for offset in offsets { + self.append(offset)?; + } + Ok(()) + } + + /// Syncs all data to disk. Must be called before committing the header. + pub fn sync(&mut self) -> io::Result<()> { + self.file.sync_all() + } + + /// Truncates the file to contain exactly `len` records and syncs to disk. + /// Used after prune operations to reclaim space. + /// + /// The sync is required for crash safety - without it, a crash could + /// resurrect the old file length. + pub fn truncate(&mut self, len: u64) -> io::Result<()> { + self.file.set_len(len * Self::RECORD_SIZE as u64)?; + self.file.sync_all()?; + self.records_written = len; + Ok(()) + } + + /// Returns the number of records in the file. + pub const fn len(&self) -> u64 { + self.records_written + } + + /// Returns true if the file is empty. + pub const fn is_empty(&self) -> bool { + self.records_written == 0 + } +} + +/// Reader for changeset offsets with O(1) random access. +#[derive(Debug)] +pub struct ChangesetOffsetReader { + file: File, + /// Cached file length in records. + len: u64, +} + +impl ChangesetOffsetReader { + /// Record size in bytes. + const RECORD_SIZE: usize = 16; + + /// Opens the changeset offset file for reading with an explicit length. + /// + /// The `len` parameter (from header metadata) bounds the reader - any records + /// beyond this length are ignored. This ensures we only read committed data. + pub fn new(path: impl AsRef, len: u64) -> io::Result { + let file = File::open(path)?; + Ok(Self { file, len }) + } + + /// Reads a single changeset offset by block index. + /// Returns None if index is out of bounds. + pub fn get(&mut self, block_index: u64) -> io::Result> { + if block_index >= self.len { + return Ok(None); + } + + let byte_pos = block_index * Self::RECORD_SIZE as u64; + self.file.seek(SeekFrom::Start(byte_pos))?; + + let mut buf = [0u8; Self::RECORD_SIZE]; + self.file.read_exact(&mut buf)?; + + let offset = u64::from_le_bytes(buf[..8].try_into().unwrap()); + let num_changes = u64::from_le_bytes(buf[8..].try_into().unwrap()); + + Ok(Some(ChangesetOffset::new(offset, num_changes))) + } + + /// Reads a range of changeset offsets. + pub fn get_range(&mut self, start: u64, end: u64) -> io::Result> { + let end = end.min(self.len); + if start >= end { + return Ok(Vec::new()); + } + + let count = (end - start) as usize; + let byte_pos = start * Self::RECORD_SIZE as u64; + self.file.seek(SeekFrom::Start(byte_pos))?; + + let mut result = Vec::with_capacity(count); + let mut buf = [0u8; Self::RECORD_SIZE]; + + for _ in 0..count { + self.file.read_exact(&mut buf)?; + let offset = u64::from_le_bytes(buf[..8].try_into().unwrap()); + let num_changes = u64::from_le_bytes(buf[8..].try_into().unwrap()); + result.push(ChangesetOffset::new(offset, num_changes)); + } + + Ok(result) + } + + /// Returns the number of valid records. + pub const fn len(&self) -> u64 { + self.len + } + + /// Returns true if there are no records. + pub const fn is_empty(&self) -> bool { + self.len == 0 + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::tempdir; + + #[test] + fn test_write_and_read() { + let dir = tempdir().unwrap(); + let path = dir.path().join("test.csoff"); + + // Write (new file, committed_len=0) + { + let mut writer = ChangesetOffsetWriter::new(&path, 0).unwrap(); + writer.append(&ChangesetOffset::new(0, 5)).unwrap(); + writer.append(&ChangesetOffset::new(5, 3)).unwrap(); + writer.append(&ChangesetOffset::new(8, 10)).unwrap(); + writer.sync().unwrap(); + assert_eq!(writer.len(), 3); + } + + // Read + { + let mut reader = ChangesetOffsetReader::new(&path, 3).unwrap(); + assert_eq!(reader.len(), 3); + + let entry = reader.get(0).unwrap().unwrap(); + assert_eq!(entry.offset(), 0); + assert_eq!(entry.num_changes(), 5); + + let entry = reader.get(1).unwrap().unwrap(); + assert_eq!(entry.offset(), 5); + assert_eq!(entry.num_changes(), 3); + + let entry = reader.get(2).unwrap().unwrap(); + assert_eq!(entry.offset(), 8); + assert_eq!(entry.num_changes(), 10); + + assert!(reader.get(3).unwrap().is_none()); + } + } + + #[test] + fn test_truncate() { + let dir = tempdir().unwrap(); + let path = dir.path().join("test.csoff"); + + let mut writer = ChangesetOffsetWriter::new(&path, 0).unwrap(); + writer.append(&ChangesetOffset::new(0, 1)).unwrap(); + writer.append(&ChangesetOffset::new(1, 2)).unwrap(); + writer.append(&ChangesetOffset::new(3, 3)).unwrap(); + writer.sync().unwrap(); + + writer.truncate(2).unwrap(); + assert_eq!(writer.len(), 2); + + let mut reader = ChangesetOffsetReader::new(&path, 2).unwrap(); + assert_eq!(reader.len(), 2); + assert!(reader.get(2).unwrap().is_none()); + } + + #[test] + fn test_partial_record_recovery() { + let dir = tempdir().unwrap(); + let path = dir.path().join("test.csoff"); + + // Write 1 full record (16 bytes) + 8 trailing bytes (partial record) + { + let mut file = std::fs::File::create(&path).unwrap(); + // Full record: offset=100, num_changes=5 + file.write_all(&100u64.to_le_bytes()).unwrap(); + file.write_all(&5u64.to_le_bytes()).unwrap(); + // Partial record: only 8 bytes (incomplete) + file.write_all(&200u64.to_le_bytes()).unwrap(); + file.sync_all().unwrap(); + } + + // Verify file has 24 bytes before opening with writer + assert_eq!(std::fs::metadata(&path).unwrap().len(), 24); + + // Open with writer, committed_len=1 (header committed 1 record) + // Should truncate partial record and match committed length + let writer = ChangesetOffsetWriter::new(&path, 1).unwrap(); + assert_eq!(writer.len(), 1); + + // Verify file was truncated to 16 bytes + assert_eq!(std::fs::metadata(&path).unwrap().len(), 16); + + // Verify the complete record is readable + let mut reader = ChangesetOffsetReader::new(&path, 1).unwrap(); + assert_eq!(reader.len(), 1); + let entry = reader.get(0).unwrap().unwrap(); + assert_eq!(entry.offset(), 100); + assert_eq!(entry.num_changes(), 5); + } + + #[test] + fn test_len_bounds_reads() { + let dir = tempdir().unwrap(); + let path = dir.path().join("test.csoff"); + + // Write 3 records + { + let mut writer = ChangesetOffsetWriter::new(&path, 0).unwrap(); + writer.append(&ChangesetOffset::new(0, 10)).unwrap(); + writer.append(&ChangesetOffset::new(10, 20)).unwrap(); + writer.append(&ChangesetOffset::new(30, 30)).unwrap(); + writer.sync().unwrap(); + assert_eq!(writer.len(), 3); + } + + // Open with len=2, ignoring the 3rd record + let mut reader = ChangesetOffsetReader::new(&path, 2).unwrap(); + assert_eq!(reader.len(), 2); + + // First two records should be readable + let entry0 = reader.get(0).unwrap().unwrap(); + assert_eq!(entry0.offset(), 0); + assert_eq!(entry0.num_changes(), 10); + + let entry1 = reader.get(1).unwrap().unwrap(); + assert_eq!(entry1.offset(), 10); + assert_eq!(entry1.num_changes(), 20); + + // Third record should be out of bounds (due to len=2) + assert!(reader.get(2).unwrap().is_none()); + + // get_range should also respect the len bound + let range = reader.get_range(0, 5).unwrap(); + assert_eq!(range.len(), 2); + } + + #[test] + fn test_truncate_uncommitted_records_on_open() { + // Simulates crash recovery where sidecar has more records than committed header length. + // ChangesetOffsetWriter::new() should automatically truncate to committed_len. + let dir = tempdir().unwrap(); + let path = dir.path().join("test.csoff"); + + // Simulate: wrote 3 records, synced sidecar, but header only committed len=2 + { + let mut writer = ChangesetOffsetWriter::new(&path, 0).unwrap(); + writer.append(&ChangesetOffset::new(0, 5)).unwrap(); + writer.append(&ChangesetOffset::new(5, 10)).unwrap(); + writer.append(&ChangesetOffset::new(15, 7)).unwrap(); // uncommitted + writer.sync().unwrap(); + assert_eq!(writer.len(), 3); + } + + // On "restart", new() heals by truncating to committed length + let committed_len = 2u64; + { + let writer = ChangesetOffsetWriter::new(&path, committed_len).unwrap(); + assert_eq!(writer.len(), 2); // Healed to committed length + } + + // Verify file is now correct length and new appends go to the right place + { + let mut writer = ChangesetOffsetWriter::new(&path, 2).unwrap(); + assert_eq!(writer.len(), 2); + + // Append a new record - should be at index 2, not index 3 + writer.append(&ChangesetOffset::new(15, 20)).unwrap(); + writer.sync().unwrap(); + assert_eq!(writer.len(), 3); + } + + // Verify the records are correct + { + let mut reader = ChangesetOffsetReader::new(&path, 3).unwrap(); + assert_eq!(reader.len(), 3); + + let entry0 = reader.get(0).unwrap().unwrap(); + assert_eq!(entry0.offset(), 0); + assert_eq!(entry0.num_changes(), 5); + + let entry1 = reader.get(1).unwrap().unwrap(); + assert_eq!(entry1.offset(), 5); + assert_eq!(entry1.num_changes(), 10); + + // This should be the NEW record, not the old uncommitted one + let entry2 = reader.get(2).unwrap().unwrap(); + assert_eq!(entry2.offset(), 15); + assert_eq!(entry2.num_changes(), 20); // Not 7 from the old uncommitted record + } + } + + #[test] + fn test_sidecar_shorter_than_committed_errors() { + // If sidecar has fewer records than committed, it's data corruption - should error. + let dir = tempdir().unwrap(); + let path = dir.path().join("test.csoff"); + + // Write 1 record + { + let mut writer = ChangesetOffsetWriter::new(&path, 0).unwrap(); + writer.append(&ChangesetOffset::new(0, 5)).unwrap(); + writer.sync().unwrap(); + } + + // Try to open with committed_len=3 (header claims more than file has) + let result = ChangesetOffsetWriter::new(&path, 3); + assert!(result.is_err()); + let err = result.unwrap_err(); + assert_eq!(err.kind(), io::ErrorKind::InvalidData); + } +} diff --git a/crates/static-file/types/src/lib.rs b/crates/static-file/types/src/lib.rs index 02998cca77..5a37e82663 100644 --- a/crates/static-file/types/src/lib.rs +++ b/crates/static-file/types/src/lib.rs @@ -15,11 +15,18 @@ mod compression; mod event; mod segment; +#[cfg(feature = "std")] +mod changeset_offsets; +#[cfg(feature = "std")] +pub use changeset_offsets::{ChangesetOffsetReader, ChangesetOffsetWriter}; + use alloy_primitives::BlockNumber; pub use compression::Compression; use core::ops::RangeInclusive; pub use event::StaticFileProducerEvent; -pub use segment::{SegmentConfig, SegmentHeader, SegmentRangeInclusive, StaticFileSegment}; +pub use segment::{ + ChangesetOffset, SegmentConfig, SegmentHeader, SegmentRangeInclusive, StaticFileSegment, +}; /// Map keyed by [`StaticFileSegment`]. pub type StaticFileMap = alloc::boxed::Box>; diff --git a/crates/static-file/types/src/segment.rs b/crates/static-file/types/src/segment.rs index d4f299d126..019b87d98a 100644 --- a/crates/static-file/types/src/segment.rs +++ b/crates/static-file/types/src/segment.rs @@ -1,5 +1,5 @@ use crate::{find_fixed_range, BlockNumber, Compression}; -use alloc::{format, string::String, vec::Vec}; +use alloc::{format, string::String}; use alloy_primitives::TxNumber; use core::{ ops::{Range, RangeInclusive}, @@ -238,6 +238,11 @@ pub struct ChangesetOffset { } impl ChangesetOffset { + /// Creates a new changeset offset. + pub const fn new(offset: u64, num_changes: u64) -> Self { + Self { offset, num_changes } + } + /// Returns the start offset for the row for this block pub const fn offset(&self) -> u64 { self.offset @@ -252,6 +257,11 @@ impl ChangesetOffset { pub const fn changeset_range(&self) -> Range { self.offset..(self.offset + self.num_changes) } + + /// Increments the number of changes by 1. + pub const fn increment_num_changes(&mut self) { + self.num_changes += 1; + } } /// A segment header that contains information common to all segments. Used for storage. @@ -269,7 +279,7 @@ pub struct SegmentHeader { /// Segment type segment: StaticFileSegment, /// List of offsets, for where each block's changeset starts. - changeset_offsets: Option>, + changeset_offsets_len: u64, } struct SegmentHeaderVisitor; @@ -298,21 +308,18 @@ impl<'de> Visitor<'de> for SegmentHeaderVisitor { let segment: StaticFileSegment = seq.next_element()?.ok_or_else(|| serde::de::Error::invalid_length(3, &self))?; - let changeset_offsets = if segment.is_change_based() { - // Try to read the 5th field (changeset_offsets) - // If it doesn't exist (old format), this will return None - match seq.next_element()? { - Some(Some(offsets)) => Some(offsets), - // Changesets should have offsets - Some(None) => None, + let changeset_offsets_len = if segment.is_change_based() { + // Try to read the 5th field (changeset_offsets_len) + match seq.next_element::()? { + Some(len) => len, None => { return Err(serde::de::Error::custom( - "changeset_offsets should exist for static files", + "changeset_offsets_len should exist for changeset static files", )) } } } else { - None + 0 }; Ok(SegmentHeader { @@ -320,7 +327,7 @@ impl<'de> Visitor<'de> for SegmentHeaderVisitor { block_range, tx_range, segment, - changeset_offsets, + changeset_offsets_len, }) } } @@ -355,7 +362,7 @@ impl Serialize for SegmentHeader { state.serialize_field("segment", &self.segment)?; if self.segment.is_change_based() { - state.serialize_field("changeset_offsets", &self.changeset_offsets)?; + state.serialize_field("changeset_offsets", &self.changeset_offsets_len)?; } state.end() @@ -370,7 +377,7 @@ impl SegmentHeader { tx_range: Option, segment: StaticFileSegment, ) -> Self { - Self { expected_block_range, block_range, tx_range, segment, changeset_offsets: None } + Self { expected_block_range, block_range, tx_range, segment, changeset_offsets_len: 0 } } /// Returns the static file segment kind. @@ -393,9 +400,20 @@ impl SegmentHeader { self.tx_range } - /// Returns the changeset offsets. - pub const fn changeset_offsets(&self) -> Option<&Vec> { - self.changeset_offsets.as_ref() + /// Returns the number of changeset offset entries. + /// The actual offsets are stored in the `.csoff` sidecar file. + pub const fn changeset_offsets_len(&self) -> u64 { + self.changeset_offsets_len + } + + /// Sets the changeset offsets length. + pub const fn set_changeset_offsets_len(&mut self, len: u64) { + self.changeset_offsets_len = len; + } + + /// Increments the changeset offsets length by 1. + pub const fn increment_changeset_offsets_len(&mut self) { + self.changeset_offsets_len += 1; } /// The expected block start of the segment. @@ -453,7 +471,7 @@ impl SegmentHeader { } /// Increments block end range depending on segment - pub fn increment_block(&mut self) -> BlockNumber { + pub const fn increment_block(&mut self) -> BlockNumber { let block_num = if let Some(block_range) = &mut self.block_range { block_range.end += 1; block_range.end @@ -465,20 +483,10 @@ impl SegmentHeader { self.expected_block_start() }; - // For changeset segments, initialize an offset entry for the new block + // For changeset segments, increment the offsets length. + // The actual offset entry is written to the sidecar file by the caller. if self.segment.is_change_based() { - let offsets = self.changeset_offsets.get_or_insert_default(); - // Calculate the offset for the new block - let new_offset = if let Some(last_offset) = offsets.last() { - // The new block starts after the last block's changes - last_offset.offset + last_offset.num_changes - } else { - // First block starts at offset 0 - 0 - }; - - // Add a new offset entry with 0 changes initially - offsets.push(ChangesetOffset { offset: new_offset, num_changes: 0 }); + self.changeset_offsets_len += 1; } block_num @@ -495,23 +503,11 @@ impl SegmentHeader { } } - /// Increments the latest block's number of changes. - pub fn increment_block_changes(&mut self) { - debug_assert!(self.segment().is_change_based()); - if self.segment.is_change_based() { - let offsets = self.changeset_offsets.get_or_insert_with(Default::default); - if let Some(last_offset) = offsets.last_mut() { - last_offset.num_changes += 1; - } else { - // If offsets is empty, we are adding the first change for a block - // The offset for the first block is 0 - offsets.push(ChangesetOffset { offset: 0, num_changes: 1 }); - } - } - } - /// Removes `num` elements from end of tx or block range. - pub fn prune(&mut self, num: u64) { + /// + /// For changeset segments, also decrements the changeset offsets length. + /// The caller must truncate the sidecar file accordingly. + pub const fn prune(&mut self, num: u64) { // Changesets also contain a block range, but are not strictly block-based if self.segment.is_block_or_change_based() { if let Some(range) = &mut self.block_range { @@ -519,26 +515,18 @@ impl SegmentHeader { self.block_range = None; // Clear all changeset offsets if we're clearing all blocks if self.segment.is_change_based() { - self.changeset_offsets = None; + self.changeset_offsets_len = 0; } } else { let old_end = range.end; range.end = range.end.saturating_sub(num); - // Update changeset offsets for account changesets - if self.segment.is_change_based() && - let Some(offsets) = &mut self.changeset_offsets - { + // Update changeset offsets length for changeset segments + if self.segment.is_change_based() { // Calculate how many blocks we're removing let blocks_to_remove = old_end - range.end; - // Remove the last `blocks_to_remove` entries from offsets - let new_len = offsets.len().saturating_sub(blocks_to_remove as usize); - offsets.truncate(new_len); - - // If we removed all offsets, set to None - if offsets.is_empty() { - self.changeset_offsets = None; - } + self.changeset_offsets_len = + self.changeset_offsets_len.saturating_sub(blocks_to_remove); } } }; @@ -561,28 +549,24 @@ impl SegmentHeader { } } - /// Synchronizes changeset offsets with the current block range for account changeset segments. + /// Synchronizes changeset offsets length with the current block range for changeset segments. /// /// This should be called after modifying the block range when dealing with changeset segments - /// to ensure the offsets vector matches the block range size. - pub fn sync_changeset_offsets(&mut self) { + /// to ensure the offsets length matches the block range size. + /// The caller must also truncate the sidecar file accordingly. + pub const fn sync_changeset_offsets(&mut self) { if !self.segment.is_change_based() { return; } if let Some(block_range) = &self.block_range { - if let Some(offsets) = &mut self.changeset_offsets { - let expected_len = (block_range.end - block_range.start + 1) as usize; - if offsets.len() > expected_len { - offsets.truncate(expected_len); - if offsets.is_empty() { - self.changeset_offsets = None; - } - } + let expected_len = block_range.end - block_range.start + 1; + if self.changeset_offsets_len > expected_len { + self.changeset_offsets_len = expected_len; } } else { // No block range means no offsets - self.changeset_offsets = None; + self.changeset_offsets_len = 0; } } @@ -608,21 +592,23 @@ impl SegmentHeader { self.tx_start() } - /// Returns the `ChangesetOffset` corresponding for the given block, if it's in the block - /// range. + /// Returns the index into the sidecar file for a given block's changeset offset. /// - /// If it is not in the block range or the changeset list in the header does not contain a - /// value for the block, this returns `None`. - pub fn changeset_offset(&self, block: BlockNumber) -> Option<&ChangesetOffset> { + /// Returns `None` if the block is not in the block range. + /// To get the changeset offset, the caller must read the offset from the sidecar file at this + /// index. + pub fn changeset_offset_index(&self, block: BlockNumber) -> Option { let block_range = self.block_range()?; if !block_range.contains(block) { - return None + return None; } - let offsets = self.changeset_offsets.as_ref()?; - let index = (block - block_range.start()) as usize; + let index = block - block_range.start(); + if index >= self.changeset_offsets_len { + return None; + } - offsets.get(index) + Some(index) } } @@ -781,42 +767,42 @@ mod tests { block_range: Some(SegmentRangeInclusive::new(0, 100)), tx_range: None, segment: StaticFileSegment::Headers, - changeset_offsets: None, + changeset_offsets_len: 0, }, SegmentHeader { expected_block_range: SegmentRangeInclusive::new(0, 200), block_range: None, tx_range: Some(SegmentRangeInclusive::new(0, 300)), segment: StaticFileSegment::Transactions, - changeset_offsets: None, + changeset_offsets_len: 0, }, SegmentHeader { expected_block_range: SegmentRangeInclusive::new(0, 200), block_range: Some(SegmentRangeInclusive::new(0, 100)), tx_range: Some(SegmentRangeInclusive::new(0, 300)), segment: StaticFileSegment::Receipts, - changeset_offsets: None, + changeset_offsets_len: 0, }, SegmentHeader { expected_block_range: SegmentRangeInclusive::new(0, 200), block_range: Some(SegmentRangeInclusive::new(0, 100)), tx_range: Some(SegmentRangeInclusive::new(0, 300)), segment: StaticFileSegment::TransactionSenders, - changeset_offsets: None, + changeset_offsets_len: 0, }, SegmentHeader { expected_block_range: SegmentRangeInclusive::new(0, 200), block_range: Some(SegmentRangeInclusive::new(0, 100)), tx_range: Some(SegmentRangeInclusive::new(0, 300)), segment: StaticFileSegment::AccountChangeSets, - changeset_offsets: Some(vec![ChangesetOffset { offset: 1, num_changes: 1 }; 100]), + changeset_offsets_len: 100, }, SegmentHeader { expected_block_range: SegmentRangeInclusive::new(0, 200), block_range: Some(SegmentRangeInclusive::new(0, 100)), tx_range: None, segment: StaticFileSegment::StorageChangeSets, - changeset_offsets: Some(vec![ChangesetOffset { offset: 1, num_changes: 1 }; 100]), + changeset_offsets_len: 100, }, ]; // Check that we test all segments diff --git a/crates/static-file/types/src/snapshots/reth_static_file_types__segment__tests__AccountChangeSets.snap b/crates/static-file/types/src/snapshots/reth_static_file_types__segment__tests__AccountChangeSets.snap index 90a60a01c8..67508176e6 100644 --- a/crates/static-file/types/src/snapshots/reth_static_file_types__segment__tests__AccountChangeSets.snap +++ b/crates/static-file/types/src/snapshots/reth_static_file_types__segment__tests__AccountChangeSets.snap @@ -2,4 +2,4 @@ source: crates/static-file/types/src/segment.rs expression: "Bytes::from(serialized)" --- -0x01000000000000000000000000000000c80000000000000001000000000000000064000000000000000100000000000000002c01000000000000040000000164000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000000000000000000000000000000000000 +0x01000000000000000000000000000000c80000000000000001000000000000000064000000000000000100000000000000002c0100000000000004000000640000000000000001000000000000000000000000000000000000000000000000 diff --git a/crates/static-file/types/src/snapshots/reth_static_file_types__segment__tests__StorageChangeSets.snap b/crates/static-file/types/src/snapshots/reth_static_file_types__segment__tests__StorageChangeSets.snap index c1b94903bd..7be5066a03 100644 --- a/crates/static-file/types/src/snapshots/reth_static_file_types__segment__tests__StorageChangeSets.snap +++ b/crates/static-file/types/src/snapshots/reth_static_file_types__segment__tests__StorageChangeSets.snap @@ -2,4 +2,4 @@ source: crates/static-file/types/src/segment.rs expression: "Bytes::from(serialized)" --- -0x01000000000000000000000000000000c800000000000000010000000000000000640000000000000000050000000164000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000000000000000000000000000000000000 +0x01000000000000000000000000000000c80000000000000001000000000000000064000000000000000005000000640000000000000001000000000000000000000000000000000000000000000000 diff --git a/crates/storage/nippy-jar/src/lib.rs b/crates/storage/nippy-jar/src/lib.rs index b924132126..3c57965145 100644 --- a/crates/storage/nippy-jar/src/lib.rs +++ b/crates/storage/nippy-jar/src/lib.rs @@ -59,6 +59,8 @@ const INDEX_FILE_EXTENSION: &str = "idx"; const OFFSETS_FILE_EXTENSION: &str = "off"; /// The file extension used for configuration files. pub const CONFIG_FILE_EXTENSION: &str = "conf"; +/// The file extension used for changeset offset sidecar files. +pub const CHANGESET_OFFSETS_FILE_EXTENSION: &str = "csoff"; /// A [`RefRow`] is a list of column value slices pointing to either an internal buffer or a /// memory-mapped file. @@ -240,13 +242,22 @@ impl NippyJar { self.path.with_extension(CONFIG_FILE_EXTENSION) } + /// Returns the path for the changeset offsets sidecar file. + pub fn changeset_offsets_path(&self) -> PathBuf { + self.path.with_extension(CHANGESET_OFFSETS_FILE_EXTENSION) + } + /// Deletes from disk this [`NippyJar`] alongside every satellite file. pub fn delete(self) -> Result<(), NippyJarError> { // TODO(joshie): ensure consistency on unexpected shutdown - for path in - [self.data_path().into(), self.index_path(), self.offsets_path(), self.config_path()] - { + for path in [ + self.data_path().into(), + self.index_path(), + self.offsets_path(), + self.config_path(), + self.changeset_offsets_path(), + ] { if path.exists() { debug!(target: "nippy-jar", ?path, "Removing file."); reth_fs_util::remove_file(path)?; diff --git a/crates/storage/provider/Cargo.toml b/crates/storage/provider/Cargo.toml index 96a4f715b5..802ad4099d 100644 --- a/crates/storage/provider/Cargo.toml +++ b/crates/storage/provider/Cargo.toml @@ -30,8 +30,10 @@ reth-nippy-jar.workspace = true reth-codecs.workspace = true reth-chain-state.workspace = true reth-node-types.workspace = true -reth-static-file-types.workspace = true +reth-static-file-types = { workspace = true, features = ["std"] } +reth-fs-util.workspace = true reth-tasks.workspace = true + # ethereum alloy-eips.workspace = true alloy-primitives.workspace = true diff --git a/crates/storage/provider/src/providers/static_file/jar.rs b/crates/storage/provider/src/providers/static_file/jar.rs index 04809742c8..4272b44f6b 100644 --- a/crates/storage/provider/src/providers/static_file/jar.rs +++ b/crates/storage/provider/src/providers/static_file/jar.rs @@ -17,6 +17,7 @@ use reth_db::static_file::{ use reth_db_api::table::{Decompress, Value}; use reth_node_types::NodePrimitives; use reth_primitives_traits::{SealedHeader, SignedTransaction}; +use reth_static_file_types::{ChangesetOffset, ChangesetOffsetReader}; use reth_storage_api::range_size_hint; use reth_storage_errors::provider::{ProviderError, ProviderResult}; use std::{ @@ -90,6 +91,63 @@ impl<'a, N: NodePrimitives> StaticFileJarProvider<'a, N> { pub fn size(&self) -> usize { self.jar.value().size() } + + /// Reads a changeset offset from the sidecar file for a given block. + /// + /// Returns `None` if: + /// - The segment is not change-based + /// - The block is not in the block range + /// - The sidecar file doesn't exist + pub fn read_changeset_offset( + &self, + block_number: BlockNumber, + ) -> ProviderResult> { + let header = self.user_header(); + if !header.segment().is_change_based() { + return Ok(None); + } + + let Some(index) = header.changeset_offset_index(block_number) else { + return Ok(None); + }; + + let csoff_path = self.data_path().with_extension("csoff"); + if !csoff_path.exists() { + return Ok(None); + } + + let len = header.changeset_offsets_len(); + let mut reader = + ChangesetOffsetReader::new(&csoff_path, len).map_err(ProviderError::other)?; + reader.get(index).map_err(ProviderError::other) + } + + /// Reads all changeset offsets from the sidecar file. + /// + /// Returns `None` if: + /// - The segment is not change-based + /// - The sidecar file doesn't exist + pub fn read_changeset_offsets(&self) -> ProviderResult>> { + let header = self.user_header(); + if !header.segment().is_change_based() { + return Ok(None); + } + + let len = header.changeset_offsets_len(); + if len == 0 { + return Ok(Some(Vec::new())); + } + + let csoff_path = self.data_path().with_extension("csoff"); + if !csoff_path.exists() { + return Ok(None); + } + + let mut reader = + ChangesetOffsetReader::new(&csoff_path, len).map_err(ProviderError::other)?; + let offsets = reader.get_range(0, len).map_err(ProviderError::other)?; + Ok(Some(offsets)) + } } impl> HeaderProvider for StaticFileJarProvider<'_, N> { diff --git a/crates/storage/provider/src/providers/static_file/manager.rs b/crates/storage/provider/src/providers/static_file/manager.rs index 65da2e6217..6d4a5500fa 100644 --- a/crates/storage/provider/src/providers/static_file/manager.rs +++ b/crates/storage/provider/src/providers/static_file/manager.rs @@ -39,8 +39,8 @@ use reth_primitives_traits::{ use reth_prune_types::PruneSegment; use reth_stages_types::PipelineTarget; use reth_static_file_types::{ - find_fixed_range, HighestStaticFiles, SegmentHeader, SegmentRangeInclusive, StaticFileMap, - StaticFileSegment, DEFAULT_BLOCKS_PER_STATIC_FILE, + find_fixed_range, ChangesetOffsetReader, HighestStaticFiles, SegmentHeader, + SegmentRangeInclusive, StaticFileMap, StaticFileSegment, DEFAULT_BLOCKS_PER_STATIC_FILE, }; use reth_storage_api::{ BlockBodyIndicesProvider, ChangeSetReader, DBProvider, PruneCheckpointReader, @@ -962,10 +962,10 @@ impl StaticFileProvider { ) -> ProviderResult { let fixed_block_range = self.find_fixed_range(segment, block); let key = (fixed_block_range.end(), segment); + let file = self.path.join(segment.filename(&fixed_block_range)); let jar = if let Some((_, jar)) = self.map.remove(&key) { jar.jar } else { - let file = self.path.join(segment.filename(&fixed_block_range)); debug!( target: "providers::static_file", ?file, @@ -977,6 +977,15 @@ impl StaticFileProvider { }; let header = jar.user_header().clone(); + + // Delete the sidecar file for changeset segments before deleting the main jar + if segment.is_change_based() { + let csoff_path = file.with_extension("csoff"); + if csoff_path.exists() { + std::fs::remove_file(&csoff_path).map_err(ProviderError::other)?; + } + } + jar.delete().map_err(ProviderError::other)?; // SAFETY: this is currently necessary to ensure that certain indexes like @@ -2380,7 +2389,7 @@ impl ChangeSetReader for StaticFileProvider { Err(err) => return Err(err), }; - if let Some(offset) = provider.user_header().changeset_offset(block_number) { + if let Some(offset) = provider.read_changeset_offset(block_number)? { let mut cursor = provider.cursor()?; let mut changeset = Vec::with_capacity(offset.num_changes() as usize); @@ -2412,9 +2421,7 @@ impl ChangeSetReader for StaticFileProvider { Err(err) => return Err(err), }; - let user_header = provider.user_header(); - - let Some(offset) = user_header.changeset_offset(block_number) else { + let Some(offset) = provider.read_changeset_offset(block_number)? else { return Ok(None); }; @@ -2473,12 +2480,19 @@ impl ChangeSetReader for StaticFileProvider { fn account_changeset_count(&self) -> ProviderResult { let mut count = 0; - // iterate through static files and sum changeset metadata via each static file header let static_files = iter_static_files(&self.path).map_err(ProviderError::other)?; if let Some(changeset_segments) = static_files.get(StaticFileSegment::AccountChangeSets) { - for (_, header) in changeset_segments { - if let Some(changeset_offsets) = header.changeset_offsets() { - for offset in changeset_offsets { + for (block_range, header) in changeset_segments { + let csoff_path = self + .path + .join(StaticFileSegment::AccountChangeSets.filename(block_range)) + .with_extension("csoff"); + if csoff_path.exists() { + let len = header.changeset_offsets_len(); + let mut reader = ChangesetOffsetReader::new(&csoff_path, len) + .map_err(ProviderError::other)?; + let offsets = reader.get_range(0, len).map_err(ProviderError::other)?; + for offset in offsets { count += offset.num_changes() as usize; } } @@ -2504,7 +2518,7 @@ impl StorageChangeSetReader for StaticFileProvider { Err(err) => return Err(err), }; - if let Some(offset) = provider.user_header().changeset_offset(block_number) { + if let Some(offset) = provider.read_changeset_offset(block_number)? { let mut cursor = provider.cursor()?; let mut changeset = Vec::with_capacity(offset.num_changes() as usize); @@ -2537,8 +2551,7 @@ impl StorageChangeSetReader for StaticFileProvider { Err(err) => return Err(err), }; - let user_header = provider.user_header(); - let Some(offset) = user_header.changeset_offset(block_number) else { + let Some(offset) = provider.read_changeset_offset(block_number)? else { return Ok(None); }; @@ -2595,9 +2608,17 @@ impl StorageChangeSetReader for StaticFileProvider { let static_files = iter_static_files(&self.path).map_err(ProviderError::other)?; if let Some(changeset_segments) = static_files.get(StaticFileSegment::StorageChangeSets) { - for (_, header) in changeset_segments { - if let Some(changeset_offsets) = header.changeset_offsets() { - for offset in changeset_offsets { + for (block_range, header) in changeset_segments { + let csoff_path = self + .path + .join(StaticFileSegment::StorageChangeSets.filename(block_range)) + .with_extension("csoff"); + if csoff_path.exists() { + let len = header.changeset_offsets_len(); + let mut reader = ChangesetOffsetReader::new(&csoff_path, len) + .map_err(ProviderError::other)?; + let offsets = reader.get_range(0, len).map_err(ProviderError::other)?; + for offset in offsets { count += offset.num_changes() as usize; } } diff --git a/crates/storage/provider/src/providers/static_file/mod.rs b/crates/storage/provider/src/providers/static_file/mod.rs index 1f44c66c37..50cd204df2 100644 --- a/crates/storage/provider/src/providers/static_file/mod.rs +++ b/crates/storage/provider/src/providers/static_file/mod.rs @@ -11,6 +11,10 @@ mod writer; pub use writer::{StaticFileProviderRW, StaticFileProviderRWRefMut}; mod metrics; + +#[cfg(test)] +mod writer_tests; + use reth_nippy_jar::NippyJar; use reth_static_file_types::{SegmentHeader, StaticFileSegment}; use reth_storage_errors::provider::{ProviderError, ProviderResult}; @@ -744,8 +748,9 @@ mod tests { .unwrap(); // Check that the segment header has changeset offsets - assert!(provider.user_header().changeset_offsets().is_some()); - let offsets = provider.user_header().changeset_offsets().unwrap(); + let offsets = provider.read_changeset_offsets().unwrap(); + assert!(offsets.is_some()); + let offsets = offsets.unwrap(); assert_eq!(offsets.len(), 10); // Should have 10 blocks worth of offsets // Verify each block has the expected number of changes @@ -855,7 +860,8 @@ mod tests { let (static_dir, _) = create_test_static_files_dir(); let blocks_per_file = 10; - let files_per_range = 3; + // 3 main files (jar, dat, idx) + 1 csoff sidecar file for changeset segments + let files_per_range = 4; let file_set_count = 3; let initial_file_count = files_per_range * file_set_count; let tip = blocks_per_file * file_set_count - 1; @@ -924,7 +930,7 @@ mod tests { )?; // Check offsets are valid - let offsets = provider.user_header().changeset_offsets(); + let offsets = provider.read_changeset_offsets().unwrap(); assert!(offsets.is_some(), "Should have changeset offsets"); } @@ -1088,8 +1094,9 @@ mod tests { .unwrap(); // Check that the segment header has changeset offsets - assert!(provider.user_header().changeset_offsets().is_some()); - let offsets = provider.user_header().changeset_offsets().unwrap(); + let offsets = provider.read_changeset_offsets().unwrap(); + assert!(offsets.is_some()); + let offsets = offsets.unwrap(); assert_eq!(offsets.len(), 10); // Should have 10 blocks worth of offsets // Verify each block has the expected number of changes @@ -1190,7 +1197,8 @@ mod tests { let (static_dir, _) = create_test_static_files_dir(); let blocks_per_file = 10; - let files_per_range = 3; + // 3 main files (jar, dat, idx) + 1 csoff sidecar file for changeset segments + let files_per_range = 4; let file_set_count = 3; let initial_file_count = files_per_range * file_set_count; let tip = blocks_per_file * file_set_count - 1; @@ -1249,7 +1257,7 @@ mod tests { tip, None, )?; - let offsets = provider.user_header().changeset_offsets(); + let offsets = provider.read_changeset_offsets()?; assert!(offsets.is_some(), "Should have changeset offsets"); } @@ -1352,4 +1360,42 @@ mod tests { } } } + + #[test] + fn test_last_block_flushed_on_commit() { + let (static_dir, _) = create_test_static_files_dir(); + + let sf_rw = StaticFileProvider::::read_write(&static_dir) + .expect("Failed to create static file provider"); + + let address = Address::from([5u8; 20]); + let key = B256::with_last_byte(1); + + // Write changes for a single block without calling increment_block explicitly + // (append_storage_changeset calls it internally), then commit + { + let mut writer = sf_rw.latest_writer(StaticFileSegment::StorageChangeSets).unwrap(); + + // Append a single block's changeset (block 0) + writer + .append_storage_changeset( + vec![StorageBeforeTx { address, key, value: U256::from(42) }], + 0, + ) + .unwrap(); + + // Commit without any subsequent block - the current block's offset should be flushed + writer.commit().unwrap(); + } + + // Verify highest block is 0 + let highest = sf_rw.get_highest_static_file_block(StaticFileSegment::StorageChangeSets); + assert_eq!(highest, Some(0), "Should have block 0 after commit"); + + // Verify the data is actually readable via the high-level API + let result = sf_rw.get_storage_before_block(0, address, key).unwrap(); + assert!(result.is_some(), "Should be able to read the changeset entry"); + let entry = result.unwrap(); + assert_eq!(entry.value, U256::from(42)); + } } diff --git a/crates/storage/provider/src/providers/static_file/writer.rs b/crates/storage/provider/src/providers/static_file/writer.rs index 5c16293da7..1d0c3fbf2b 100644 --- a/crates/storage/provider/src/providers/static_file/writer.rs +++ b/crates/storage/provider/src/providers/static_file/writer.rs @@ -10,7 +10,10 @@ use reth_db::models::{AccountBeforeTx, StorageBeforeTx}; use reth_db_api::models::CompactU256; use reth_nippy_jar::{NippyJar, NippyJarError, NippyJarWriter}; use reth_node_types::NodePrimitives; -use reth_static_file_types::{SegmentHeader, SegmentRangeInclusive, StaticFileSegment}; +use reth_static_file_types::{ + ChangesetOffset, ChangesetOffsetReader, ChangesetOffsetWriter, SegmentHeader, + SegmentRangeInclusive, StaticFileSegment, +}; use reth_storage_errors::provider::{ProviderError, ProviderResult, StaticFileWriterError}; use std::{ borrow::Borrow, @@ -232,6 +235,10 @@ pub struct StaticFileProviderRW { prune_on_commit: Option, /// Whether `sync_all()` has been called. Used by `finalize()` to avoid redundant syncs. synced: bool, + /// Changeset offsets sidecar writer (only for changeset segments). + changeset_offsets: Option, + /// Current block's changeset offset being written. + current_changeset_offset: Option, } impl StaticFileProviderRW { @@ -246,6 +253,8 @@ impl StaticFileProviderRW { metrics: Option>, ) -> ProviderResult { let (writer, data_path) = Self::open(segment, block, reader.clone(), metrics.clone())?; + + // Create writer WITHOUT sidecar first - we'll add it after healing let mut writer = Self { writer, data_path, @@ -254,10 +263,19 @@ impl StaticFileProviderRW { metrics, prune_on_commit: None, synced: false, + changeset_offsets: None, + current_changeset_offset: None, }; + // Run NippyJar healing BEFORE setting up changeset sidecar + // This may reduce rows, which affects valid sidecar offsets writer.ensure_end_range_consistency()?; + // Now set up changeset sidecar with post-heal header values + if segment.is_change_based() { + writer.heal_changeset_sidecar()?; + } + Ok(writer) } @@ -349,7 +367,164 @@ impl StaticFileProviderRW { self.prune_on_commit.is_some() } - /// Syncs all data (rows and offsets) to disk. + /// Heals the changeset offset sidecar after `NippyJar` healing. + /// + /// This must be called AFTER `ensure_end_range_consistency()` which may reduce rows. + /// Performs three-way consistency check between header, `NippyJar` rows, and sidecar file: + /// - Validates sidecar offsets don't point past actual `NippyJar` rows + /// - Heals header if sidecar was truncated during interrupted prune + /// - Truncates sidecar if offsets point past healed `NippyJar` data + fn heal_changeset_sidecar(&mut self) -> ProviderResult<()> { + let csoff_path = self.data_path.with_extension("csoff"); + + // Step 1: Read all three sources of truth + let header_claims_blocks = self.writer.user_header().changeset_offsets_len(); + let actual_nippy_rows = self.writer.rows() as u64; + + // Get actual sidecar file size (may differ from header after crash) + let actual_sidecar_blocks = if csoff_path.exists() { + let file_len = reth_fs_util::metadata(&csoff_path).map_err(ProviderError::other)?.len(); + // Remove partial records from crash mid-write + let aligned_len = file_len - (file_len % 16); + aligned_len / 16 + } else { + 0 + }; + + // Fresh segment or no sidecar data - nothing to heal + if header_claims_blocks == 0 && actual_sidecar_blocks == 0 { + self.changeset_offsets = + Some(ChangesetOffsetWriter::new(&csoff_path, 0).map_err(ProviderError::other)?); + return Ok(()); + } + + // Step 2: Validate sidecar offsets against actual NippyJar state + let valid_blocks = if actual_sidecar_blocks > 0 { + let mut reader = ChangesetOffsetReader::new(&csoff_path, actual_sidecar_blocks) + .map_err(ProviderError::other)?; + + // Find last block where offset + num_changes <= actual_nippy_rows + // This correctly handles rows=0 with offset=0, num_changes=0 (empty blocks) + let mut valid = 0u64; + for i in 0..actual_sidecar_blocks { + if let Some(offset) = reader.get(i).map_err(ProviderError::other)? { + if offset.offset() + offset.num_changes() <= actual_nippy_rows { + valid = i + 1; + } else { + // This block points past EOF - stop here + break; + } + } + } + valid + } else { + 0 + }; + + // Step 3: Determine correct state from synced files (source of truth) + // Header is the commit marker - never enlarge, only shrink + let correct_blocks = valid_blocks.min(header_claims_blocks); + + // Step 4: Heal if header doesn't match validated truth + let mut needs_header_commit = false; + + if correct_blocks != header_claims_blocks || actual_sidecar_blocks != correct_blocks { + tracing::warn!( + target: "reth::static_file", + path = %csoff_path.display(), + header_claims = header_claims_blocks, + sidecar_has = actual_sidecar_blocks, + valid_blocks = correct_blocks, + actual_rows = actual_nippy_rows, + "Three-way healing: syncing header, sidecar, and NippyJar state" + ); + + // Truncate sidecar file if it has invalid blocks + if actual_sidecar_blocks > correct_blocks { + use std::fs::OpenOptions; + let file = OpenOptions::new() + .write(true) + .open(&csoff_path) + .map_err(ProviderError::other)?; + file.set_len(correct_blocks * 16).map_err(ProviderError::other)?; + file.sync_all().map_err(ProviderError::other)?; + + tracing::debug!( + target: "reth::static_file", + "Truncated sidecar from {} to {} blocks", + actual_sidecar_blocks, + correct_blocks + ); + } + + // Update header to match validated truth (can only shrink, never enlarge) + if correct_blocks < header_claims_blocks { + // Blocks were removed - use prune() to update both block_range and + // changeset_offsets_len atomically + let blocks_removed = header_claims_blocks - correct_blocks; + self.writer.user_header_mut().prune(blocks_removed); + + tracing::debug!( + target: "reth::static_file", + "Updated header: removed {} blocks (changeset_offsets_len: {} -> {})", + blocks_removed, + header_claims_blocks, + correct_blocks + ); + + needs_header_commit = true; + } + } else { + tracing::debug!( + target: "reth::static_file", + path = %csoff_path.display(), + blocks = correct_blocks, + "Changeset sidecar consistent, no healing needed" + ); + } + + // Open sidecar writer with corrected count (won't error now that sizes match) + let csoff_writer = ChangesetOffsetWriter::new(&csoff_path, correct_blocks) + .map_err(ProviderError::other)?; + + self.changeset_offsets = Some(csoff_writer); + + // Commit healed header if needed (after sidecar writer is set up) + if needs_header_commit { + self.writer.commit().map_err(ProviderError::other)?; + + tracing::info!( + target: "reth::static_file", + path = %csoff_path.display(), + blocks = correct_blocks, + "Committed healed changeset offset header" + ); + } + + Ok(()) + } + + /// Flushes the current changeset offset (if any) to the `.csoff` sidecar file. + /// + /// This is idempotent - safe to call multiple times. After flushing, the current offset + /// is cleared to prevent duplicate writes. + /// + /// This must be called before committing or syncing to ensure the last block's offset + /// is persisted, since `increment_block()` only writes the *previous* block's offset. + fn flush_current_changeset_offset(&mut self) -> ProviderResult<()> { + if !self.writer.user_header().segment().is_change_based() { + return Ok(()); + } + + if let Some(offset) = self.current_changeset_offset.take() && + let Some(writer) = &mut self.changeset_offsets + { + writer.append(&offset).map_err(ProviderError::other)?; + } + Ok(()) + } + + /// Syncs all data (rows, offsets, and changeset offsets sidecar) to disk. /// /// This does NOT commit the configuration. Call [`Self::finalize`] after to write the /// configuration and mark the writer as clean. @@ -359,6 +534,15 @@ impl StaticFileProviderRW { if self.prune_on_commit.is_some() { return Err(StaticFileWriterError::FinalizeWithPruneQueued.into()); } + + // Write the final block's offset and sync the sidecar for changeset segments + self.flush_current_changeset_offset()?; + if let Some(writer) = &mut self.changeset_offsets { + writer.sync().map_err(ProviderError::other)?; + // Update the header with the actual number of offsets written + self.writer.user_header_mut().set_changeset_offsets_len(writer.len()); + } + if self.writer.is_dirty() { self.writer.sync_all().map_err(ProviderError::other)?; } @@ -383,7 +567,9 @@ impl StaticFileProviderRW { } if self.writer.is_dirty() { if !self.synced { - self.writer.sync_all().map_err(ProviderError::other)?; + // Must call self.sync_all() to flush changeset offsets and update + // the header's changeset_offsets_len, not just the inner writer + self.sync_all()?; } self.writer.finalize().map_err(ProviderError::other)?; @@ -430,6 +616,15 @@ impl StaticFileProviderRW { } } + // For changeset segments, flush and sync the sidecar file before committing the main file. + // This ensures crash consistency: the sidecar is durable before the header references it. + self.flush_current_changeset_offset()?; + if let Some(writer) = &mut self.changeset_offsets { + writer.sync().map_err(ProviderError::other)?; + // Update the header with the actual number of offsets written + self.writer.user_header_mut().set_changeset_offsets_len(writer.len()); + } + if self.writer.is_dirty() { debug!( target: "providers::static_file", @@ -573,7 +768,15 @@ impl StaticFileProviderRW { let (writer, data_path) = Self::open(segment, last_block + 1, self.reader.clone(), self.metrics.clone())?; self.writer = writer; - self.data_path = data_path; + self.data_path = data_path.clone(); + + // Update changeset offsets writer for the new file (starts empty) + if segment.is_change_based() { + let csoff_path = data_path.with_extension("csoff"); + self.changeset_offsets = Some( + ChangesetOffsetWriter::new(&csoff_path, 0).map_err(ProviderError::other)?, + ); + } *self.writer.user_header_mut() = SegmentHeader::new( self.reader().find_fixed_range(segment, last_block + 1), @@ -585,6 +788,20 @@ impl StaticFileProviderRW { } self.writer.user_header_mut().increment_block(); + + // Handle changeset offset tracking for changeset segments + if segment.is_change_based() { + // Write previous block's offset if we have one + if let Some(offset) = self.current_changeset_offset.take() && + let Some(writer) = &mut self.changeset_offsets + { + writer.append(&offset).map_err(ProviderError::other)?; + } + // Start tracking new block's offset + let new_offset = self.writer.rows() as u64; + self.current_changeset_offset = Some(ChangesetOffset::new(new_offset, 0)); + } + if let Some(metrics) = &self.metrics { metrics.record_segment_operation( segment, @@ -656,13 +873,6 @@ impl StaticFileProviderRW { expected_block_start = self.writer.user_header().expected_block_start(); } - // Now we're in the correct file, we need to find how many rows to prune - // We need to iterate through the changesets to find the correct position - // Since changesets are stored per block, we need to find the offset for the block - let changeset_offsets = self.writer.user_header().changeset_offsets().ok_or_else(|| { - ProviderError::other(StaticFileWriterError::new("Missing changeset offsets")) - })?; - // Find the number of rows to keep (up to and including last_block) let blocks_to_keep = if last_block >= expected_block_start { last_block - expected_block_start + 1 @@ -670,18 +880,30 @@ impl StaticFileProviderRW { 0 }; + // Read changeset offsets from sidecar file to find where to truncate + let csoff_path = self.data_path.with_extension("csoff"); + let changeset_offsets_len = self.writer.user_header().changeset_offsets_len(); + + // Flush any pending changeset offset before reading the sidecar + self.flush_current_changeset_offset()?; + let rows_to_keep = if blocks_to_keep == 0 { 0 - } else if blocks_to_keep as usize > changeset_offsets.len() { - // Keep all rows in this file (shouldn't happen if data is consistent) - self.writer.rows() as u64 - } else if blocks_to_keep as usize == changeset_offsets.len() { - // Keep all rows + } else if blocks_to_keep >= changeset_offsets_len { + // Keep all rows in this file self.writer.rows() as u64 } else { - // Find the offset for the block after last_block - // This gives us the number of rows to keep - changeset_offsets[blocks_to_keep as usize].offset() + // Read offset for the block after last_block from sidecar. + // Use committed length from header, ignoring any uncommitted records + // that may exist in the file after a crash. + let mut reader = ChangesetOffsetReader::new(&csoff_path, changeset_offsets_len) + .map_err(ProviderError::other)?; + if let Some(next_offset) = reader.get(blocks_to_keep).map_err(ProviderError::other)? { + next_offset.offset() + } else { + // If we can't read the offset, keep all rows + self.writer.rows() as u64 + } }; let total_rows = self.writer.rows() as u64; @@ -709,6 +931,14 @@ impl StaticFileProviderRW { // Sync changeset offsets to match the new block range self.writer.user_header_mut().sync_changeset_offsets(); + // Truncate the sidecar file to match the new block count + if let Some(writer) = &mut self.changeset_offsets { + writer.truncate(blocks_to_keep).map_err(ProviderError::other)?; + } + + // Clear current changeset offset tracking since we've pruned + self.current_changeset_offset = None; + // Commits new changes to disk self.commit()?; @@ -789,16 +1019,36 @@ impl StaticFileProviderRW { /// Delete the current static file, and replace this provider writer with the previous static /// file. fn delete_current_and_open_previous(&mut self) -> Result<(), ProviderError> { + let segment = self.user_header().segment(); let current_path = self.data_path.clone(); let (previous_writer, data_path) = Self::open( - self.user_header().segment(), + segment, self.writer.user_header().expected_block_start() - 1, self.reader.clone(), self.metrics.clone(), )?; self.writer = previous_writer; self.writer.set_dirty(); - self.data_path = data_path; + self.data_path = data_path.clone(); + + // Delete the sidecar file for changeset segments before deleting the main jar + if segment.is_change_based() { + let csoff_path = current_path.with_extension("csoff"); + if csoff_path.exists() { + std::fs::remove_file(&csoff_path).map_err(ProviderError::other)?; + } + // Re-initialize the changeset offsets writer for the previous file + let new_csoff_path = data_path.with_extension("csoff"); + let committed_len = self.writer.user_header().changeset_offsets_len(); + self.changeset_offsets = Some( + ChangesetOffsetWriter::new(&new_csoff_path, committed_len) + .map_err(ProviderError::other)?, + ); + } + + // Clear current changeset offset tracking since we're switching files + self.current_changeset_offset = None; + NippyJar::::load(¤t_path) .map_err(ProviderError::other)? .delete() @@ -842,10 +1092,9 @@ impl StaticFileProviderRW { /// Appends change to changeset static file. fn append_change(&mut self, change: &V) -> ProviderResult<()> { - if self.writer.user_header().changeset_offsets().is_some() { - self.writer.user_header_mut().increment_block_changes(); + if let Some(ref mut offset) = self.current_changeset_offset { + offset.increment_num_changes(); } - self.append_column(change)?; Ok(()) } diff --git a/crates/storage/provider/src/providers/static_file/writer_tests.rs b/crates/storage/provider/src/providers/static_file/writer_tests.rs new file mode 100644 index 0000000000..8f75ea5529 --- /dev/null +++ b/crates/storage/provider/src/providers/static_file/writer_tests.rs @@ -0,0 +1,812 @@ +//! Crash recovery tests for changeset offset healing. +//! +//! These tests verify the three-way consistency healing logic between: +//! 1. Header (`SegmentHeader.changeset_offsets_len`) +//! 2. `NippyJar` rows (actual row count) +//! 3. Sidecar file (.csoff) +//! +//! All tests use real `NippyJar` files via `StaticFileProvider` to test the full healing path. + +#[cfg(test)] +mod tests { + use alloy_primitives::{Address, U256}; + use reth_db::{models::AccountBeforeTx, test_utils::create_test_static_files_dir}; + use reth_primitives_traits::Account; + use reth_static_file_types::{ChangesetOffset, ChangesetOffsetReader, StaticFileSegment}; + use std::{fs::OpenOptions, io::Write as _, path::PathBuf}; + + use crate::providers::{ + static_file::manager::{StaticFileProviderBuilder, StaticFileWriter}, + StaticFileProvider, + }; + use reth_chain_state::EthPrimitives; + + // ==================== HELPER FUNCTIONS ==================== + + /// Creates a `StaticFileProvider` for testing with the given `blocks_per_file` setting. + fn setup_test_provider( + static_dir: &tempfile::TempDir, + blocks_per_file: u64, + ) -> StaticFileProvider { + StaticFileProviderBuilder::read_write(static_dir) + .with_blocks_per_file(blocks_per_file) + .build() + .expect("Failed to build static file provider") + } + + /// Generates test changeset data for a block. + fn generate_test_changeset(block_num: u64, num_changes: usize) -> Vec { + (0..num_changes) + .map(|i| { + let mut address = Address::ZERO; + address.0[0] = block_num as u8; + address.0[1] = i as u8; + AccountBeforeTx { + address, + info: Some(Account { + nonce: block_num, + balance: U256::from(block_num * 1000 + i as u64), + bytecode_hash: None, + }), + } + }) + .collect() + } + + /// Writes test blocks to the `AccountChangeSets` segment. + /// Returns the path to the sidecar file. + fn write_test_blocks( + provider: &StaticFileProvider, + num_blocks: u64, + changes_per_block: usize, + ) -> PathBuf { + let mut writer = provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap(); + + for block_num in 0..num_blocks { + let changeset = generate_test_changeset(block_num, changes_per_block); + writer.append_account_changeset(changeset, block_num).unwrap(); + } + + writer.commit().unwrap(); + + // Return path to sidecar + get_sidecar_path(provider, 0) + } + + /// Gets the .csoff sidecar path for a given block. + fn get_sidecar_path(provider: &StaticFileProvider, block: u64) -> PathBuf { + let range = provider.find_fixed_range(StaticFileSegment::AccountChangeSets, block); + let filename = StaticFileSegment::AccountChangeSets.filename(&range); + provider.directory().join(filename).with_extension("csoff") + } + + /// Reads the block count from a sidecar file (file size / 16). + fn get_sidecar_block_count(path: &PathBuf) -> u64 { + if !path.exists() { + return 0; + } + let metadata = std::fs::metadata(path).unwrap(); + metadata.len() / 16 + } + + /// Appends a partial record to sidecar (simulates crash mid-write). + fn corrupt_sidecar_partial_write(path: &PathBuf, partial_bytes: usize) { + let mut file = OpenOptions::new().append(true).open(path).unwrap(); + file.write_all(&vec![0u8; partial_bytes]).unwrap(); + file.sync_all().unwrap(); + } + + /// Appends fake blocks to sidecar that point past actual `NippyJar` rows. + fn corrupt_sidecar_add_fake_blocks(path: &PathBuf, num_fake_blocks: u64, start_offset: u64) { + let mut file = OpenOptions::new().append(true).open(path).unwrap(); + for i in 0..num_fake_blocks { + let offset = ChangesetOffset::new(start_offset + i * 5, 5); + let mut buf = [0u8; 16]; + buf[..8].copy_from_slice(&offset.offset().to_le_bytes()); + buf[8..].copy_from_slice(&offset.num_changes().to_le_bytes()); + file.write_all(&buf).unwrap(); + } + file.sync_all().unwrap(); + } + + /// Truncates the sidecar file to a specific number of blocks. + fn truncate_sidecar(path: &PathBuf, num_blocks: u64) { + let file = OpenOptions::new().write(true).open(path).unwrap(); + file.set_len(num_blocks * 16).unwrap(); + file.sync_all().unwrap(); + } + + /// Reads the `changeset_offsets_len` from the segment header. + fn get_header_block_count(provider: &StaticFileProvider, block: u64) -> u64 { + let jar_provider = provider + .get_segment_provider_for_block(StaticFileSegment::AccountChangeSets, block, None) + .unwrap(); + jar_provider.user_header().changeset_offsets_len() + } + + /// Gets the actual row count from `NippyJar`. + fn get_nippy_row_count(provider: &StaticFileProvider, block: u64) -> u64 { + let jar_provider = provider + .get_segment_provider_for_block(StaticFileSegment::AccountChangeSets, block, None) + .unwrap(); + jar_provider.rows() as u64 + } + + // ==================== APPEND CRASH SCENARIOS ==================== + + #[test] + fn test_append_crash_partial_sidecar_record() { + // SCENARIO: Crash mid-write of a 16-byte sidecar record. + // State after crash: Sidecar has N complete records + partial bytes. + // Expected: Healing truncates partial bytes, keeps N complete records. + + let (static_dir, _) = create_test_static_files_dir(); + let provider = setup_test_provider(&static_dir, 100); + + // Write 5 blocks with 3 changes each + let sidecar_path = write_test_blocks(&provider, 5, 3); + assert_eq!(get_sidecar_block_count(&sidecar_path), 5); + + // Corrupt: append partial record (8 of 16 bytes) + corrupt_sidecar_partial_write(&sidecar_path, 8); + assert_eq!( + std::fs::metadata(&sidecar_path).unwrap().len(), + 5 * 16 + 8, + "Should have 5 records + 8 partial bytes" + ); + + // Reopen provider - triggers healing + drop(provider); + let provider = setup_test_provider(&static_dir, 100); + + // Verify healing truncated partial record + let _writer = provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap(); + assert_eq!(get_sidecar_block_count(&sidecar_path), 5, "Should have 5 complete records"); + assert_eq!( + std::fs::metadata(&sidecar_path).unwrap().len(), + 5 * 16, + "File should be exactly 80 bytes" + ); + } + + #[test] + fn test_append_crash_sidecar_synced_header_not_committed() { + // SCENARIO: Sidecar was synced with new blocks, but header commit crashed. + // State after crash: Sidecar has more blocks than header claims. + // Expected: Healing clamps to header value (never enlarges header). + + let (static_dir, _) = create_test_static_files_dir(); + let provider = setup_test_provider(&static_dir, 100); + + // Write 5 blocks + let sidecar_path = write_test_blocks(&provider, 5, 3); + let total_rows = 5 * 3; // 15 rows + + // Corrupt: add 3 fake blocks to sidecar (simulates sidecar sync before header commit) + // These blocks point to valid rows (within the 15 rows we have) + // But header doesn't know about them + corrupt_sidecar_add_fake_blocks(&sidecar_path, 3, total_rows as u64); + assert_eq!(get_sidecar_block_count(&sidecar_path), 8, "Sidecar should have 8 blocks"); + + // Reopen - healing should clamp to header's 5 blocks + drop(provider); + let provider = setup_test_provider(&static_dir, 100); + let _writer = provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap(); + + // Header is authoritative - sidecar should be truncated to 5 + assert_eq!(get_sidecar_block_count(&sidecar_path), 5, "Should clamp to header value"); + assert_eq!(get_header_block_count(&provider, 0), 5); + } + + #[test] + fn test_append_crash_sidecar_ahead_of_nippy_offsets() { + // APPEND CP2: After data sync, before offsets sync. + // SCENARIO: Sidecar was synced first (has new blocks), data file has new rows, + // but NippyJar offsets file is stale. Config is stale. + // + // We can't easily simulate NippyJar internal offset mismatch, but we CAN test + // that sidecar entries are validated against actual_nippy_rows (from NippyJar.rows()). + // If sidecar claims blocks that would exceed NippyJar's row count, healing truncates. + + let (static_dir, _) = create_test_static_files_dir(); + let provider = setup_test_provider(&static_dir, 100); + + // Write 5 blocks with 3 changes each (15 rows total) + let sidecar_path = write_test_blocks(&provider, 5, 3); + assert_eq!(get_nippy_row_count(&provider, 0), 15); + assert_eq!(get_sidecar_block_count(&sidecar_path), 5); + + // Corrupt sidecar: add 3 fake blocks that claim to point to rows beyond NippyJar's 15 + // Block 6: offset=15, count=3 (rows 15-17, but only 15 rows exist!) + // Block 7: offset=18, count=3 (rows 18-20, invalid) + // Block 8: offset=21, count=3 (rows 21-23, invalid) + corrupt_sidecar_add_fake_blocks(&sidecar_path, 3, 15); + assert_eq!(get_sidecar_block_count(&sidecar_path), 8); + + // Reopen - healing should detect sidecar offsets point past actual NippyJar rows + // and truncate back. Since header is 5, healing clamps to min(8, 5) = 5. + drop(provider); + let provider = setup_test_provider(&static_dir, 100); + let _writer = provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap(); + + // Sidecar should be clamped to header's 5 blocks + assert_eq!(get_sidecar_block_count(&sidecar_path), 5); + assert_eq!(get_header_block_count(&provider, 0), 5); + // NippyJar rows unchanged + assert_eq!(get_nippy_row_count(&provider, 0), 15); + } + + #[test] + fn test_append_clean_no_crash() { + // BASELINE: Normal append with no crash. + // All three sources should be in sync. + + let (static_dir, _) = create_test_static_files_dir(); + let provider = setup_test_provider(&static_dir, 100); + + // Write 10 blocks with 5 changes each + let sidecar_path = write_test_blocks(&provider, 10, 5); + + // Verify all in sync + assert_eq!(get_sidecar_block_count(&sidecar_path), 10); + assert_eq!(get_header_block_count(&provider, 0), 10); + assert_eq!(get_nippy_row_count(&provider, 0), 50); // 10 blocks * 5 changes + + // Reopen multiple times - should remain stable + drop(provider); + for _ in 0..3 { + let provider = setup_test_provider(&static_dir, 100); + let _writer = provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap(); + assert_eq!(get_sidecar_block_count(&sidecar_path), 10); + } + } + + // ==================== PRUNE CRASH SCENARIOS ==================== + + #[test] + fn test_prune_crash_sidecar_truncated_header_stale() { + // SCENARIO: Prune truncated sidecar but crashed before header commit. + // State after crash: Sidecar has fewer blocks than header claims. + // Expected: Healing updates header to match sidecar. + + let (static_dir, _) = create_test_static_files_dir(); + let provider = setup_test_provider(&static_dir, 100); + + // Write 10 blocks + let sidecar_path = write_test_blocks(&provider, 10, 5); + assert_eq!(get_sidecar_block_count(&sidecar_path), 10); + + // Simulate prune crash: truncate sidecar to 7 blocks but don't update header + // (In real crash, header would still claim 10 blocks) + truncate_sidecar(&sidecar_path, 7); + assert_eq!(get_sidecar_block_count(&sidecar_path), 7); + + // Reopen - healing should detect sidecar < header and fix header + drop(provider); + let provider = setup_test_provider(&static_dir, 100); + { + let _writer = provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap(); + // Writer commits healed header on open + } + // Drop writer and reopen provider to see committed changes + drop(provider); + let provider = setup_test_provider(&static_dir, 100); + + // After healing, header should match sidecar + assert_eq!(get_sidecar_block_count(&sidecar_path), 7); + assert_eq!(get_header_block_count(&provider, 0), 7); + } + + #[test] + fn test_prune_crash_sidecar_offsets_past_nippy_rows() { + // SCENARIO: NippyJar was pruned but sidecar wasn't truncated. + // State after crash: Sidecar has offsets pointing past actual NippyJar rows. + // Expected: Healing validates offsets and truncates invalid blocks. + + let (static_dir, _) = create_test_static_files_dir(); + let provider = setup_test_provider(&static_dir, 100); + + // Write 10 blocks with 5 changes each (50 rows total) + let sidecar_path = write_test_blocks(&provider, 10, 5); + + // Verify initial state + assert_eq!(get_nippy_row_count(&provider, 0), 50); + assert_eq!(get_sidecar_block_count(&sidecar_path), 10); + + // Now add fake blocks to sidecar that point past row 50 + // These simulate a crash where sidecar wasn't cleaned up after NippyJar prune + corrupt_sidecar_add_fake_blocks(&sidecar_path, 5, 50); // Blocks pointing to rows 50-74 + assert_eq!(get_sidecar_block_count(&sidecar_path), 15); + + // Reopen - healing should detect invalid offsets and truncate + drop(provider); + let provider = setup_test_provider(&static_dir, 100); + let _writer = provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap(); + + // Should be back to 10 valid blocks (offsets pointing within 50 rows) + assert_eq!(get_sidecar_block_count(&sidecar_path), 10); + assert_eq!(get_header_block_count(&provider, 0), 10); + } + + #[test] + fn test_prune_crash_nippy_offsets_truncated_data_stale() { + // PRUNE CP1-3: Tests healing when sidecar has offsets past NippyJar.rows(). + // NippyJar heals internally, then changeset healing validates against rows(). + // Verifies healing uses actual_nippy_rows as source of truth. + + let (static_dir, _) = create_test_static_files_dir(); + let provider = setup_test_provider(&static_dir, 100); + + // Write 10 blocks with varying changes + { + let mut writer = provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap(); + for block_num in 0..10 { + let changes = (block_num % 5 + 1) as usize; // 1-5 changes per block + writer + .append_account_changeset( + generate_test_changeset(block_num, changes), + block_num, + ) + .unwrap(); + } + writer.commit().unwrap(); + } + + let sidecar_path = get_sidecar_path(&provider, 0); + let actual_rows = get_nippy_row_count(&provider, 0); + assert_eq!(get_sidecar_block_count(&sidecar_path), 10); + + // Simulate a scenario where sidecar has blocks pointing past actual rows. + // Add 2 fake blocks that would exceed the row count. + // Block 11: offset=actual_rows, count=5 (pointing past EOF) + corrupt_sidecar_add_fake_blocks(&sidecar_path, 2, actual_rows); + assert_eq!(get_sidecar_block_count(&sidecar_path), 12); + + // Reopen - healing validates all offsets against NippyJar.rows() + drop(provider); + let provider = setup_test_provider(&static_dir, 100); + let _writer = provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap(); + + // After healing: sidecar clamped to header (10), rows unchanged + assert_eq!(get_sidecar_block_count(&sidecar_path), 10, "Sidecar clamped to header"); + assert_eq!(get_header_block_count(&provider, 0), 10); + assert_eq!(get_nippy_row_count(&provider, 0), actual_rows, "NippyJar rows unchanged"); + } + + #[test] + fn test_prune_crash_sidecar_truncate_not_synced() { + // PRUNE CP6: Sidecar truncated but not synced (power loss could resurrect old length). + // SCENARIO: We pruned from 10 to 7 blocks. Header was updated to 7. + // Sidecar was truncated to 7 but fsync didn't complete before power loss. + // After restart, filesystem resurrects sidecar back to 10 (old length). + // + // State after crash: Header says 7, sidecar shows 10. + // Expected: Healing clamps sidecar to header's 7 (header is authoritative). + + let (static_dir, _) = create_test_static_files_dir(); + let provider = setup_test_provider(&static_dir, 100); + + // Write 10 blocks with 5 changes each + let sidecar_path = write_test_blocks(&provider, 10, 5); + assert_eq!(get_sidecar_block_count(&sidecar_path), 10); + assert_eq!(get_header_block_count(&provider, 0), 10); + + // Prune to 7 blocks (keep blocks 0-6) + { + let mut writer = provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap(); + writer.prune_account_changesets(6).unwrap(); + writer.commit().unwrap(); + } + + // Verify prune worked + drop(provider); + let provider = setup_test_provider(&static_dir, 100); + assert_eq!(get_header_block_count(&provider, 0), 7); + assert_eq!(get_sidecar_block_count(&sidecar_path), 7); + + // Now write 3 more blocks (back to 10 total) + { + let mut writer = provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap(); + for block_num in 7..10 { + let changeset = generate_test_changeset(block_num, 5); + writer.append_account_changeset(changeset, block_num).unwrap(); + } + writer.commit().unwrap(); + } + drop(provider); + let provider = setup_test_provider(&static_dir, 100); + assert_eq!(get_sidecar_block_count(&sidecar_path), 10); + assert_eq!(get_header_block_count(&provider, 0), 10); + + // Prune back to 7 again + { + let mut writer = provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap(); + writer.prune_account_changesets(6).unwrap(); + writer.commit().unwrap(); + } + drop(provider); + + // SIMULATE POWER LOSS: Restore sidecar to 10 blocks (as if truncate wasn't synced) + // Extend sidecar back to 10 blocks to simulate "resurrected" state + corrupt_sidecar_add_fake_blocks(&sidecar_path, 3, 35); + assert_eq!(get_sidecar_block_count(&sidecar_path), 10, "Simulated resurrected sidecar"); + + // Reopen - healing should detect sidecar (10) > header (7) and clamp + let provider = setup_test_provider(&static_dir, 100); + let _writer = provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap(); + + // Header is authoritative - sidecar must be clamped to 7 + assert_eq!(get_sidecar_block_count(&sidecar_path), 7, "Sidecar clamped to header"); + assert_eq!(get_header_block_count(&provider, 0), 7, "Header unchanged"); + } + + #[test] + fn test_prune_with_unflushed_current_offset() { + // REGRESSION (Issue #7): truncate_changesets() didn't call flush_current_changeset_offset() + // before reading the sidecar. This caused incorrect truncation when there was an unflushed + // current_changeset_offset from recent appends. + // + // Test scenario: Prune immediately after appending without committing first. + + let (static_dir, _) = create_test_static_files_dir(); + let provider = setup_test_provider(&static_dir, 100); + + let sidecar_path = get_sidecar_path(&provider, 0); + + { + let mut writer = provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap(); + + // Step 1: Write 5 blocks (0-4), commit + for block_num in 0..5u64 { + let changeset = generate_test_changeset(block_num, 3); + writer.append_account_changeset(changeset, block_num).unwrap(); + } + writer.commit().unwrap(); + + // Step 2: Append 5 more blocks (5-9) WITHOUT committing + for block_num in 5..10u64 { + let changeset = generate_test_changeset(block_num, 3); + writer.append_account_changeset(changeset, block_num).unwrap(); + } + // Note: NOT committing here - block 9's offset is only in current_changeset_offset + + // Step 3: Prune to block 7 (should keep blocks 0-7, including uncommitted 5-7) + // This tests that the unflushed current_changeset_offset (for block 9) is properly + // flushed before reading the sidecar during truncation. + writer.prune_account_changesets(7).unwrap(); + + // Step 4: Commit + writer.commit().unwrap(); + } + + // Step 5: Verify sidecar has 8 blocks, header has 8 blocks, rows are correct + assert_eq!(get_sidecar_block_count(&sidecar_path), 8, "Sidecar should have 8 blocks (0-7)"); + assert_eq!(get_header_block_count(&provider, 0), 8, "Header should have 8 blocks"); + assert_eq!( + get_nippy_row_count(&provider, 0), + 24, + "Should have 8 blocks * 3 changes = 24 rows" + ); + + // Verify stability after reopen + drop(provider); + let provider = setup_test_provider(&static_dir, 100); + let _writer = provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap(); + + assert_eq!(get_sidecar_block_count(&sidecar_path), 8, "Sidecar stable after reopen"); + assert_eq!(get_header_block_count(&provider, 0), 8, "Header stable after reopen"); + assert_eq!(get_nippy_row_count(&provider, 0), 24, "Rows stable after reopen"); + } + + #[test] + fn test_prune_clean_no_crash() { + // BASELINE: Normal prune with no crash. + + let (static_dir, _) = create_test_static_files_dir(); + let provider = setup_test_provider(&static_dir, 100); + + // Write 10 blocks + write_test_blocks(&provider, 10, 5); + + // Prune to keep blocks 0-6 (7 blocks total, removing blocks 7-9) + // prune_account_changesets takes last_block to keep, not count to remove + let sidecar_path = get_sidecar_path(&provider, 0); + { + let mut writer = provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap(); + writer.prune_account_changesets(6).unwrap(); // Keep blocks 0-6 + writer.commit().unwrap(); + } + + // Reopen provider to see committed changes + drop(provider); + let provider = setup_test_provider(&static_dir, 100); + + // Verify all in sync at 7 blocks (0-6) + assert_eq!(get_sidecar_block_count(&sidecar_path), 7); + assert_eq!(get_header_block_count(&provider, 0), 7); + + // Rows should also be reduced (7 blocks * 5 changes = 35 rows) + assert_eq!(get_nippy_row_count(&provider, 0), 35); + } + + // ==================== VALIDATION EDGE CASES ==================== + + #[test] + fn test_empty_segment_fresh_start() { + // SCENARIO: Brand new segment with no data. + + let (static_dir, _) = create_test_static_files_dir(); + let provider = setup_test_provider(&static_dir, 100); + + // Just open a writer without writing anything + { + let _writer = provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap(); + } + + // Sidecar might not exist or be empty + let sidecar_path = get_sidecar_path(&provider, 0); + let block_count = get_sidecar_block_count(&sidecar_path); + assert_eq!(block_count, 0, "Fresh segment should have 0 blocks"); + } + + #[test] + fn test_all_empty_blocks_preserved_on_reopen() { + // REGRESSION: When all blocks have empty changesets (0 rows in NippyJar), + // healing incorrectly pruned all blocks because the validation was skipped + // when actual_nippy_rows == 0. But (offset=0, num_changes=0) is valid when rows=0. + + let (static_dir, _) = create_test_static_files_dir(); + let provider = setup_test_provider(&static_dir, 100); + + // Write 5 blocks with 0 changes each => 0 total rows in NippyJar + let sidecar_path = write_test_blocks(&provider, 5, 0); + assert_eq!(get_sidecar_block_count(&sidecar_path), 5); + assert_eq!(get_header_block_count(&provider, 0), 5); + assert_eq!(get_nippy_row_count(&provider, 0), 0); + + // Reopen - healing must preserve all 5 blocks + drop(provider); + let provider = setup_test_provider(&static_dir, 100); + let _writer = provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap(); + + // Must still have 5 blocks after healing + assert_eq!( + get_sidecar_block_count(&sidecar_path), + 5, + "Healing should not prune empty blocks" + ); + assert_eq!(get_header_block_count(&provider, 0), 5); + assert_eq!(get_nippy_row_count(&provider, 0), 0); + } + + #[test] + fn test_empty_blocks_zero_changes() { + // SCENARIO: Some blocks have 0 changes (empty changesets). + + let (static_dir, _) = create_test_static_files_dir(); + let provider = setup_test_provider(&static_dir, 100); + + { + let mut writer = provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap(); + + // Block 0: 3 changes + writer.append_account_changeset(generate_test_changeset(0, 3), 0).unwrap(); + + // Block 1: 0 changes (empty) + writer.append_account_changeset(vec![], 1).unwrap(); + + // Block 2: 2 changes + writer.append_account_changeset(generate_test_changeset(2, 2), 2).unwrap(); + + // Block 3: 0 changes (empty) + writer.append_account_changeset(vec![], 3).unwrap(); + + // Block 4: 5 changes + writer.append_account_changeset(generate_test_changeset(4, 5), 4).unwrap(); + + writer.commit().unwrap(); + } + + let sidecar_path = get_sidecar_path(&provider, 0); + + // Verify 5 blocks recorded + assert_eq!(get_sidecar_block_count(&sidecar_path), 5); + assert_eq!(get_header_block_count(&provider, 0), 5); + + // Verify offsets are correct + let mut reader = ChangesetOffsetReader::new(&sidecar_path, 5).unwrap(); + + let o0 = reader.get(0).unwrap().unwrap(); + assert_eq!(o0.offset(), 0); + assert_eq!(o0.num_changes(), 3); + + let o1 = reader.get(1).unwrap().unwrap(); + assert_eq!(o1.offset(), 3); + assert_eq!(o1.num_changes(), 0); // Empty block + + let o2 = reader.get(2).unwrap().unwrap(); + assert_eq!(o2.offset(), 3); // Same offset as block 1 (0 changes didn't advance) + assert_eq!(o2.num_changes(), 2); + + let o3 = reader.get(3).unwrap().unwrap(); + assert_eq!(o3.offset(), 5); + assert_eq!(o3.num_changes(), 0); // Empty block + + let o4 = reader.get(4).unwrap().unwrap(); + assert_eq!(o4.offset(), 5); + assert_eq!(o4.num_changes(), 5); + + // Total rows: 3 + 0 + 2 + 0 + 5 = 10 + assert_eq!(get_nippy_row_count(&provider, 0), 10); + + // Reopen and verify healing doesn't break empty blocks + drop(provider); + let provider = setup_test_provider(&static_dir, 100); + let _writer = provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap(); + assert_eq!(get_sidecar_block_count(&sidecar_path), 5); + } + + #[test] + fn test_healing_never_enlarges_header() { + // INVARIANT: Header is the commit marker. Healing should NEVER enlarge it. + // Even if sidecar has more valid blocks, we trust header. + + let (static_dir, _) = create_test_static_files_dir(); + let provider = setup_test_provider(&static_dir, 100); + + // Write 5 blocks + let sidecar_path = write_test_blocks(&provider, 5, 3); + + // Add more blocks to sidecar that would be "valid" (point within existing rows) + // These simulate uncommitted blocks from a crashed append + corrupt_sidecar_add_fake_blocks(&sidecar_path, 3, 0); + assert_eq!(get_sidecar_block_count(&sidecar_path), 8); + + // Reopen - healing should clamp to header's 5, not enlarge to 8 + drop(provider); + let provider = setup_test_provider(&static_dir, 100); + let _writer = provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap(); + + assert_eq!(get_sidecar_block_count(&sidecar_path), 5, "Must clamp to header"); + assert_eq!(get_header_block_count(&provider, 0), 5, "Header unchanged"); + } + + #[test] + fn test_multiple_reopen_cycles_stable() { + // STABILITY: Opening and closing multiple times shouldn't change anything. + + let (static_dir, _) = create_test_static_files_dir(); + + // Initial write + { + let provider = setup_test_provider(&static_dir, 100); + write_test_blocks(&provider, 10, 5); + } + + // Reopen 5 times + for i in 0..5 { + let provider = setup_test_provider(&static_dir, 100); + let sidecar_path = get_sidecar_path(&provider, 0); + + let _writer = provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap(); + + assert_eq!(get_sidecar_block_count(&sidecar_path), 10, "Cycle {}: block count", i); + assert_eq!(get_header_block_count(&provider, 0), 10, "Cycle {}: header", i); + assert_eq!(get_nippy_row_count(&provider, 0), 50, "Cycle {}: rows", i); + } + } + + #[test] + fn test_combined_partial_and_extra_blocks() { + // COMBINED: Partial record AND extra complete blocks. + // Healing should handle both: truncate partial, then validate remaining. + + let (static_dir, _) = create_test_static_files_dir(); + let provider = setup_test_provider(&static_dir, 100); + + // Write 5 blocks + let sidecar_path = write_test_blocks(&provider, 5, 3); + + // Corrupt: add 2 fake blocks pointing past EOF, then partial record + corrupt_sidecar_add_fake_blocks(&sidecar_path, 2, 100); // Invalid offsets + corrupt_sidecar_partial_write(&sidecar_path, 10); // Partial record + + let file_size = std::fs::metadata(&sidecar_path).unwrap().len(); + assert_eq!(file_size, 5 * 16 + 2 * 16 + 10, "5 valid + 2 fake + 10 partial"); + + // Reopen - healing should: + // 1. Truncate partial (10 bytes) + // 2. Clamp to header's 5 blocks (remove 2 fake blocks) + drop(provider); + let provider = setup_test_provider(&static_dir, 100); + let _writer = provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap(); + + assert_eq!(get_sidecar_block_count(&sidecar_path), 5); + assert_eq!(get_header_block_count(&provider, 0), 5); + } + + // ==================== REGRESSION TESTS ==================== + + #[test] + fn test_prune_double_decrement_regression() { + // REGRESSION: Previously, healing called set_changeset_offsets_len() then prune(), + // causing double decrement. Now we only call prune() which handles both. + + let (static_dir, _) = create_test_static_files_dir(); + let provider = setup_test_provider(&static_dir, 100); + + // Write 10 blocks + let sidecar_path = write_test_blocks(&provider, 10, 5); + + // Simulate prune crash: sidecar at 7, header should be updated from 10 to 7 + truncate_sidecar(&sidecar_path, 7); + + // Reopen - healing fixes header + drop(provider); + let provider = setup_test_provider(&static_dir, 100); + { + let _writer = provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap(); + // Writer commits healed header on open + } + // Reopen provider to see committed changes + drop(provider); + let provider = setup_test_provider(&static_dir, 100); + + // Should be exactly 7, not 7 - (10-7) = 4 (double decrement bug) + assert_eq!(get_header_block_count(&provider, 0), 7); + assert_eq!(get_sidecar_block_count(&sidecar_path), 7); + } + + #[test] + fn test_prune_with_uncommitted_sidecar_records() { + // REGRESSION: truncate_changesets() previously read file size from disk instead of + // using the committed length from header. After a crash, sidecar may have uncommitted + // records. The fix uses ChangesetOffsetReader::new() with explicit length. + // + // SCENARIO: Simulate crash where sidecar has more records than header claims, + // then prune. The prune should use header's block count, not sidecar's. + + let (static_dir, _) = create_test_static_files_dir(); + let provider = setup_test_provider(&static_dir, 100); + + // Step 1: Write 10 blocks with 5 changes each, commit + let sidecar_path = write_test_blocks(&provider, 10, 5); + assert_eq!(get_sidecar_block_count(&sidecar_path), 10); + assert_eq!(get_header_block_count(&provider, 0), 10); + assert_eq!(get_nippy_row_count(&provider, 0), 50); + + // Step 2: Corrupt sidecar by adding 3 fake blocks that point to valid but + // uncommitted offsets. This simulates a crash where sidecar was synced but + // header wasn't committed. + corrupt_sidecar_add_fake_blocks(&sidecar_path, 3, 50); + assert_eq!(get_sidecar_block_count(&sidecar_path), 13, "Sidecar has 3 uncommitted blocks"); + + // Step 3: Reopen provider - healing runs and clamps sidecar to header's 10 + drop(provider); + let provider = setup_test_provider(&static_dir, 100); + { + let _writer = provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap(); + } + assert_eq!(get_sidecar_block_count(&sidecar_path), 10, "Healing clamped sidecar to 10"); + assert_eq!(get_header_block_count(&provider, 0), 10); + + // Step 4: Prune to block 6 (keep blocks 0-6, remove 7-9) + { + let mut writer = provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap(); + writer.prune_account_changesets(6).unwrap(); + writer.commit().unwrap(); + } + + // Step 5: Verify sidecar has 7 blocks, header has 7 blocks, rows are correct + drop(provider); + let provider = setup_test_provider(&static_dir, 100); + + assert_eq!(get_sidecar_block_count(&sidecar_path), 7, "Sidecar should have 7 blocks"); + assert_eq!(get_header_block_count(&provider, 0), 7, "Header should have 7 blocks"); + assert_eq!( + get_nippy_row_count(&provider, 0), + 35, + "Should have 7 blocks * 5 changes = 35 rows" + ); + } +}