mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-02-19 03:04:27 -05:00
feat(static-file): incremental changeset offset storage (#21596)
Co-authored-by: Amp <amp@ampcode.com> Co-authored-by: Dan Cline <6798349+Rjected@users.noreply.github.com> Co-authored-by: joshieDo <93316087+joshieDo@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
parent
3bfd002477
commit
6953971c2f
6
.changelog/quiet-foxes-dance.md
Normal file
6
.changelog/quiet-foxes-dance.md
Normal file
@@ -0,0 +1,6 @@
|
||||
---
|
||||
reth-static-file-types: patch
|
||||
reth-provider: patch
|
||||
---
|
||||
|
||||
Move changeset offsets from segment header to external `.csoff` sidecar file for incremental writes and crash recovery.
|
||||
118
Cargo.lock
generated
118
Cargo.lock
generated
@@ -516,9 +516,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "alloy-rlp"
|
||||
version = "0.3.12"
|
||||
version = "0.3.13"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5f70d83b765fdc080dbcd4f4db70d8d23fe4761f2f02ebfa9146b833900634b4"
|
||||
checksum = "e93e50f64a77ad9c5470bf2ad0ca02f228da70c792a8f06634801e202579f35e"
|
||||
dependencies = [
|
||||
"alloy-rlp-derive",
|
||||
"arrayvec",
|
||||
@@ -527,9 +527,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "alloy-rlp-derive"
|
||||
version = "0.3.12"
|
||||
version = "0.3.13"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "64b728d511962dda67c1bc7ea7c03736ec275ed2cf4c35d9585298ac9ccf3b73"
|
||||
checksum = "ce8849c74c9ca0f5a03da1c865e3eb6f768df816e67dd3721a398a8a7e398011"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
@@ -1001,7 +1001,7 @@ version = "1.1.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc"
|
||||
dependencies = [
|
||||
"windows-sys 0.60.2",
|
||||
"windows-sys 0.61.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -1012,7 +1012,7 @@ checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d"
|
||||
dependencies = [
|
||||
"anstyle",
|
||||
"once_cell_polyfill",
|
||||
"windows-sys 0.60.2",
|
||||
"windows-sys 0.61.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -2175,9 +2175,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "clap"
|
||||
version = "4.5.56"
|
||||
version = "4.5.57"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a75ca66430e33a14957acc24c5077b503e7d374151b2b4b3a10c83b4ceb4be0e"
|
||||
checksum = "6899ea499e3fb9305a65d5ebf6e3d2248c5fab291f300ad0a704fbe142eae31a"
|
||||
dependencies = [
|
||||
"clap_builder",
|
||||
"clap_derive",
|
||||
@@ -2185,9 +2185,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "clap_builder"
|
||||
version = "4.5.56"
|
||||
version = "4.5.57"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "793207c7fa6300a0608d1080b858e5fdbe713cdc1c8db9fb17777d8a13e63df0"
|
||||
checksum = "7b12c8b680195a62a8364d16b8447b01b6c2c8f9aaf68bee653be34d4245e238"
|
||||
dependencies = [
|
||||
"anstream",
|
||||
"anstyle",
|
||||
@@ -3093,7 +3093,7 @@ dependencies = [
|
||||
"libc",
|
||||
"option-ext",
|
||||
"redox_users 0.5.2",
|
||||
"windows-sys 0.59.0",
|
||||
"windows-sys 0.61.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -3423,7 +3423,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"windows-sys 0.52.0",
|
||||
"windows-sys 0.61.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -4006,9 +4006,9 @@ checksum = "1d674e81391d1e1ab681a28d99df07927c6d4aa5b027d7da16ba32d1d21ecd99"
|
||||
|
||||
[[package]]
|
||||
name = "flate2"
|
||||
version = "1.1.8"
|
||||
version = "1.1.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b375d6465b98090a5f25b1c7703f3859783755aa9a80433b36e0379a3ec2f369"
|
||||
checksum = "843fba2746e448b37e26a819579957415c8cef339bf08564fe8b7ddbd959573c"
|
||||
dependencies = [
|
||||
"crc32fast",
|
||||
"miniz_oxide",
|
||||
@@ -4693,7 +4693,7 @@ dependencies = [
|
||||
"tokio",
|
||||
"tokio-rustls",
|
||||
"tower-service",
|
||||
"webpki-roots 1.0.5",
|
||||
"webpki-roots 1.0.6",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -4711,14 +4711,13 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "hyper-util"
|
||||
version = "0.1.19"
|
||||
version = "0.1.20"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "727805d60e7938b76b826a6ef209eb70eaa1812794f9424d4a4e2d740662df5f"
|
||||
checksum = "96547c2556ec9d12fb1578c4eaf448b04993e7fb79cbaad930a656880a6bdfa0"
|
||||
dependencies = [
|
||||
"base64 0.22.1",
|
||||
"bytes",
|
||||
"futures-channel",
|
||||
"futures-core",
|
||||
"futures-util",
|
||||
"http",
|
||||
"http-body",
|
||||
@@ -4999,9 +4998,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "insta"
|
||||
version = "1.46.2"
|
||||
version = "1.46.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "38c91d64f9ad425e80200a50a0e8b8a641680b44e33ce832efe5b8bc65161b07"
|
||||
checksum = "e82db8c87c7f1ccecb34ce0c24399b8a73081427f3c7c50a5d597925356115e4"
|
||||
dependencies = [
|
||||
"console",
|
||||
"once_cell",
|
||||
@@ -5033,9 +5032,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "interprocess"
|
||||
version = "2.2.3"
|
||||
version = "2.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d941b405bd2322993887859a8ee6ac9134945a24ec5ec763a8a962fc64dfec2d"
|
||||
checksum = "7b00d05442c2106c75b7410f820b152f61ec0edc7befcb9b381b673a20314753"
|
||||
dependencies = [
|
||||
"doctest-file",
|
||||
"futures-core",
|
||||
@@ -5091,7 +5090,7 @@ checksum = "3640c1c38b8e4e43584d8df18be5fc6b0aa314ce6ebf51b53313d4306cca8e46"
|
||||
dependencies = [
|
||||
"hermit-abi",
|
||||
"libc",
|
||||
"windows-sys 0.52.0",
|
||||
"windows-sys 0.61.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -5683,6 +5682,12 @@ dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "mach2"
|
||||
version = "0.6.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "dae608c151f68243f2b000364e1f7b186d9c29845f7d2d85bd31b9ad77ad552b"
|
||||
|
||||
[[package]]
|
||||
name = "macro-string"
|
||||
version = "0.1.4"
|
||||
@@ -5788,13 +5793,13 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "metrics-process"
|
||||
version = "2.4.2"
|
||||
version = "2.4.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f615e08e049bd14a44c4425415782efb9bcd479fc1e19ddeb971509074c060d0"
|
||||
checksum = "4268d87f64a752f5a651314fc683f04da10be65701ea3e721ba4d74f79163cac"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"libproc",
|
||||
"mach2",
|
||||
"mach2 0.6.0",
|
||||
"metrics",
|
||||
"once_cell",
|
||||
"procfs",
|
||||
@@ -6045,7 +6050,7 @@ version = "0.50.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5"
|
||||
dependencies = [
|
||||
"windows-sys 0.59.0",
|
||||
"windows-sys 0.61.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -7406,9 +7411,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "regex"
|
||||
version = "1.12.2"
|
||||
version = "1.12.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "843bc0191f75f3e22651ae5f1e72939ab2f72a4bc30fa80a066bd66edefc24d4"
|
||||
checksum = "e10754a14b9137dd7b1e3e5b0493cc9171fdd105e0ab477f51b72e7f3ac0e276"
|
||||
dependencies = [
|
||||
"aho-corasick",
|
||||
"memchr",
|
||||
@@ -7418,9 +7423,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "regex-automata"
|
||||
version = "0.4.13"
|
||||
version = "0.4.14"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5276caf25ac86c8d810222b3dbb938e512c55c6831a10f3e6ed1c93b84041f1c"
|
||||
checksum = "6e1dd4122fc1595e8162618945476892eefca7b88c52820e74af6262213cae8f"
|
||||
dependencies = [
|
||||
"aho-corasick",
|
||||
"memchr",
|
||||
@@ -7429,9 +7434,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "regex-syntax"
|
||||
version = "0.8.8"
|
||||
version = "0.8.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7a2d987857b319362043e95f5353c0535c1f58eec5336fdfcf626430af7def58"
|
||||
checksum = "a96887878f22d7bad8a3b6dc5b7440e0ada9a245242924394987b21cf2210a4c"
|
||||
|
||||
[[package]]
|
||||
name = "regress"
|
||||
@@ -7489,7 +7494,7 @@ dependencies = [
|
||||
"wasm-bindgen-futures",
|
||||
"wasm-streams",
|
||||
"web-sys",
|
||||
"webpki-roots 1.0.5",
|
||||
"webpki-roots 1.0.6",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -9639,6 +9644,7 @@ dependencies = [
|
||||
"reth-ethereum-engine-primitives",
|
||||
"reth-ethereum-primitives",
|
||||
"reth-execution-types",
|
||||
"reth-fs-util",
|
||||
"reth-metrics",
|
||||
"reth-nippy-jar",
|
||||
"reth-node-types",
|
||||
@@ -10347,6 +10353,8 @@ dependencies = [
|
||||
"serde",
|
||||
"serde_json",
|
||||
"strum",
|
||||
"tempfile",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -10993,9 +11001,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "rlimit"
|
||||
version = "0.10.2"
|
||||
version = "0.11.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7043b63bd0cd1aaa628e476b80e6d4023a3b50eb32789f2728908107bd0c793a"
|
||||
checksum = "f35ee2729c56bb610f6dba436bf78135f728b7373bdffae2ec815b2d3eb98cc3"
|
||||
dependencies = [
|
||||
"libc",
|
||||
]
|
||||
@@ -11186,7 +11194,7 @@ dependencies = [
|
||||
"errno",
|
||||
"libc",
|
||||
"linux-raw-sys",
|
||||
"windows-sys 0.52.0",
|
||||
"windows-sys 0.61.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -11326,9 +11334,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "schemars"
|
||||
version = "1.2.0"
|
||||
version = "1.2.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "54e910108742c57a770f492731f99be216a52fadd361b06c8fb59d74ccc267d2"
|
||||
checksum = "a2b42f36aa1cd011945615b92222f6bf73c599a102a300334cd7f8dbeec726cc"
|
||||
dependencies = [
|
||||
"dyn-clone",
|
||||
"ref-cast",
|
||||
@@ -11590,7 +11598,7 @@ dependencies = [
|
||||
"indexmap 1.9.3",
|
||||
"indexmap 2.13.0",
|
||||
"schemars 0.9.0",
|
||||
"schemars 1.2.0",
|
||||
"schemars 1.2.1",
|
||||
"serde_core",
|
||||
"serde_json",
|
||||
"serde_with_macros",
|
||||
@@ -12035,7 +12043,7 @@ dependencies = [
|
||||
"getrandom 0.3.4",
|
||||
"once_cell",
|
||||
"rustix",
|
||||
"windows-sys 0.52.0",
|
||||
"windows-sys 0.61.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -12656,7 +12664,7 @@ dependencies = [
|
||||
"cfg-if",
|
||||
"itoa",
|
||||
"libc",
|
||||
"mach2",
|
||||
"mach2 0.5.0",
|
||||
"memmap2",
|
||||
"smallvec",
|
||||
"tracing-core",
|
||||
@@ -13221,14 +13229,14 @@ version = "0.26.11"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "75c7f0ef91146ebfb530314f5f1d24528d7f0767efbfd31dce919275413e393e"
|
||||
dependencies = [
|
||||
"webpki-root-certs 1.0.5",
|
||||
"webpki-root-certs 1.0.6",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "webpki-root-certs"
|
||||
version = "1.0.5"
|
||||
version = "1.0.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "36a29fc0408b113f68cf32637857ab740edfafdf460c326cd2afaa2d84cc05dc"
|
||||
checksum = "804f18a4ac2676ffb4e8b5b5fa9ae38af06df08162314f96a68d2a363e21a8ca"
|
||||
dependencies = [
|
||||
"rustls-pki-types",
|
||||
]
|
||||
@@ -13239,14 +13247,14 @@ version = "0.26.11"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "521bc38abb08001b01866da9f51eb7c5d647a19260e00054a8c7fd5f9e57f7a9"
|
||||
dependencies = [
|
||||
"webpki-roots 1.0.5",
|
||||
"webpki-roots 1.0.6",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "webpki-roots"
|
||||
version = "1.0.5"
|
||||
version = "1.0.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "12bed680863276c63889429bfd6cab3b99943659923822de1c8a39c49e4d722c"
|
||||
checksum = "22cfaf3c063993ff62e73cb4311efde4db1efb31ab78a3e5c457939ad5cc0bed"
|
||||
dependencies = [
|
||||
"rustls-pki-types",
|
||||
]
|
||||
@@ -13279,7 +13287,7 @@ version = "0.1.11"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22"
|
||||
dependencies = [
|
||||
"windows-sys 0.48.0",
|
||||
"windows-sys 0.61.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -13807,18 +13815,18 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "zerocopy"
|
||||
version = "0.8.37"
|
||||
version = "0.8.38"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7456cf00f0685ad319c5b1693f291a650eaf345e941d082fc4e03df8a03996ac"
|
||||
checksum = "57cf3aa6855b23711ee9852dfc97dfaa51c45feaba5b645d0c777414d494a961"
|
||||
dependencies = [
|
||||
"zerocopy-derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "zerocopy-derive"
|
||||
version = "0.8.37"
|
||||
version = "0.8.38"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1328722bbf2115db7e19d69ebcc15e795719e2d66b60827c6a69a117365e37a0"
|
||||
checksum = "8a616990af1a287837c4fe6596ad77ef57948f787e46ce28e166facc0cc1cb75"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
@@ -13902,9 +13910,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "zmij"
|
||||
version = "1.0.18"
|
||||
version = "1.0.19"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1966f8ac2c1f76987d69a74d0e0f929241c10e78136434e3be70ff7f58f64214"
|
||||
checksum = "3ff05f8caa9038894637571ae6b9e29466c1f4f829d26c9b28f869a29cbe3445"
|
||||
|
||||
[[package]]
|
||||
name = "zstd"
|
||||
|
||||
@@ -67,6 +67,7 @@ tempfile.workspace = true
|
||||
|
||||
[features]
|
||||
default = []
|
||||
edge = ["reth-provider/edge"]
|
||||
serde = [
|
||||
"reth-exex-types/serde",
|
||||
"reth-revm/serde",
|
||||
|
||||
@@ -38,3 +38,4 @@ assert_matches.workspace = true
|
||||
tempfile.workspace = true
|
||||
|
||||
[features]
|
||||
edge = ["reth-stages/edge"]
|
||||
|
||||
@@ -13,6 +13,7 @@ workspace = true
|
||||
|
||||
[dependencies]
|
||||
alloy-primitives.workspace = true
|
||||
tracing = { workspace = true, optional = true }
|
||||
|
||||
clap = { workspace = true, features = ["derive"], optional = true }
|
||||
fixed-map.workspace = true
|
||||
@@ -25,6 +26,7 @@ strum = { workspace = true, features = ["derive"] }
|
||||
reth-nippy-jar.workspace = true
|
||||
serde_json.workspace = true
|
||||
insta.workspace = true
|
||||
tempfile.workspace = true
|
||||
|
||||
[features]
|
||||
default = ["std"]
|
||||
@@ -36,5 +38,7 @@ std = [
|
||||
"strum/std",
|
||||
"serde_json/std",
|
||||
"fixed-map/std",
|
||||
"dep:tracing",
|
||||
"tracing?/std",
|
||||
]
|
||||
clap = ["dep:clap"]
|
||||
|
||||
437
crates/static-file/types/src/changeset_offsets.rs
Normal file
437
crates/static-file/types/src/changeset_offsets.rs
Normal file
@@ -0,0 +1,437 @@
|
||||
//! Changeset offset sidecar file I/O.
|
||||
//!
|
||||
//! Provides append-only writing and O(1) random-access reading for changeset offsets.
|
||||
//! The file format is fixed-width 16-byte records: `[offset: u64 LE][num_changes: u64 LE]`.
|
||||
|
||||
use crate::ChangesetOffset;
|
||||
use std::{
|
||||
fs::{File, OpenOptions},
|
||||
io::{self, Read, Seek, SeekFrom, Write},
|
||||
path::Path,
|
||||
};
|
||||
|
||||
/// Writer for appending changeset offsets to a sidecar file.
|
||||
#[derive(Debug)]
|
||||
pub struct ChangesetOffsetWriter {
|
||||
file: File,
|
||||
/// Number of records written.
|
||||
records_written: u64,
|
||||
}
|
||||
|
||||
impl ChangesetOffsetWriter {
|
||||
/// Record size in bytes.
|
||||
const RECORD_SIZE: usize = 16;
|
||||
|
||||
/// Opens or creates the changeset offset file for appending.
|
||||
///
|
||||
/// The file is healed to match `committed_len` (from the segment header):
|
||||
/// - Partial records (from crash mid-write) are truncated to record boundary
|
||||
/// - Extra complete records (from crash after sidecar sync but before header commit) are
|
||||
/// truncated to match the committed length
|
||||
/// - If the file has fewer records than committed, returns an error (data corruption)
|
||||
///
|
||||
/// This mirrors `NippyJar`'s healing behavior where config/header is the commit boundary.
|
||||
pub fn new(path: impl AsRef<Path>, committed_len: u64) -> io::Result<Self> {
|
||||
let file = OpenOptions::new()
|
||||
.create(true)
|
||||
.truncate(false)
|
||||
.read(true)
|
||||
.write(true)
|
||||
.open(path.as_ref())?;
|
||||
|
||||
let file_len = file.metadata()?.len();
|
||||
let remainder = file_len % Self::RECORD_SIZE as u64;
|
||||
|
||||
// First, truncate any partial record from crash mid-write
|
||||
let aligned_len = if remainder != 0 {
|
||||
let truncated_len = file_len - remainder;
|
||||
tracing::warn!(
|
||||
target: "reth::static_file",
|
||||
path = %path.as_ref().display(),
|
||||
original_len = file_len,
|
||||
truncated_len,
|
||||
"Truncating partial changeset offset record"
|
||||
);
|
||||
file.set_len(truncated_len)?;
|
||||
file.sync_all()?; // Sync required for crash safety
|
||||
truncated_len
|
||||
} else {
|
||||
file_len
|
||||
};
|
||||
|
||||
let records_in_file = aligned_len / Self::RECORD_SIZE as u64;
|
||||
|
||||
// Heal sidecar to match committed header length
|
||||
match records_in_file.cmp(&committed_len) {
|
||||
std::cmp::Ordering::Greater => {
|
||||
// Sidecar has uncommitted records from a crash - truncate them
|
||||
let target_len = committed_len * Self::RECORD_SIZE as u64;
|
||||
tracing::warn!(
|
||||
target: "reth::static_file",
|
||||
path = %path.as_ref().display(),
|
||||
sidecar_records = records_in_file,
|
||||
committed_len,
|
||||
"Truncating uncommitted changeset offset records after crash recovery"
|
||||
);
|
||||
file.set_len(target_len)?;
|
||||
file.sync_all()?; // Sync required for crash safety
|
||||
}
|
||||
std::cmp::Ordering::Less => {
|
||||
// INVARIANT VIOLATION: This should be impossible if healing ran correctly.
|
||||
//
|
||||
// All code paths call `heal_changeset_sidecar()` before this function, which
|
||||
// validates the sidecar against NippyJar state and corrects the header to match
|
||||
// the actual file size. Therefore, `committed_len` should always equal or exceed
|
||||
// `records_in_file` when this function is called.
|
||||
//
|
||||
// If we reach this error, it indicates:
|
||||
// - A bug in the healing logic (header not corrected properly)
|
||||
// - This function was called directly without going through healing
|
||||
// - External corruption occurred between healing and opening (extremely unlikely)
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::InvalidData,
|
||||
format!(
|
||||
"INVARIANT VIOLATION: Changeset offset sidecar has {} records but header expects {} \
|
||||
(healing should have prevented this - possible bug in healing logic): {}",
|
||||
records_in_file,
|
||||
committed_len,
|
||||
path.as_ref().display()
|
||||
),
|
||||
));
|
||||
}
|
||||
std::cmp::Ordering::Equal => {}
|
||||
}
|
||||
|
||||
let records_written = committed_len;
|
||||
let file = OpenOptions::new().create(true).append(true).open(path)?;
|
||||
|
||||
Ok(Self { file, records_written })
|
||||
}
|
||||
|
||||
/// Appends a single changeset offset record.
|
||||
pub fn append(&mut self, offset: &ChangesetOffset) -> io::Result<()> {
|
||||
let mut buf = [0u8; Self::RECORD_SIZE];
|
||||
buf[..8].copy_from_slice(&offset.offset().to_le_bytes());
|
||||
buf[8..].copy_from_slice(&offset.num_changes().to_le_bytes());
|
||||
self.file.write_all(&buf)?;
|
||||
self.records_written += 1;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Appends multiple changeset offset records.
|
||||
pub fn append_many(&mut self, offsets: &[ChangesetOffset]) -> io::Result<()> {
|
||||
for offset in offsets {
|
||||
self.append(offset)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Syncs all data to disk. Must be called before committing the header.
|
||||
pub fn sync(&mut self) -> io::Result<()> {
|
||||
self.file.sync_all()
|
||||
}
|
||||
|
||||
/// Truncates the file to contain exactly `len` records and syncs to disk.
|
||||
/// Used after prune operations to reclaim space.
|
||||
///
|
||||
/// The sync is required for crash safety - without it, a crash could
|
||||
/// resurrect the old file length.
|
||||
pub fn truncate(&mut self, len: u64) -> io::Result<()> {
|
||||
self.file.set_len(len * Self::RECORD_SIZE as u64)?;
|
||||
self.file.sync_all()?;
|
||||
self.records_written = len;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns the number of records in the file.
|
||||
pub const fn len(&self) -> u64 {
|
||||
self.records_written
|
||||
}
|
||||
|
||||
/// Returns true if the file is empty.
|
||||
pub const fn is_empty(&self) -> bool {
|
||||
self.records_written == 0
|
||||
}
|
||||
}
|
||||
|
||||
/// Reader for changeset offsets with O(1) random access.
|
||||
#[derive(Debug)]
|
||||
pub struct ChangesetOffsetReader {
|
||||
file: File,
|
||||
/// Cached file length in records.
|
||||
len: u64,
|
||||
}
|
||||
|
||||
impl ChangesetOffsetReader {
|
||||
/// Record size in bytes.
|
||||
const RECORD_SIZE: usize = 16;
|
||||
|
||||
/// Opens the changeset offset file for reading with an explicit length.
|
||||
///
|
||||
/// The `len` parameter (from header metadata) bounds the reader - any records
|
||||
/// beyond this length are ignored. This ensures we only read committed data.
|
||||
pub fn new(path: impl AsRef<Path>, len: u64) -> io::Result<Self> {
|
||||
let file = File::open(path)?;
|
||||
Ok(Self { file, len })
|
||||
}
|
||||
|
||||
/// Reads a single changeset offset by block index.
|
||||
/// Returns None if index is out of bounds.
|
||||
pub fn get(&mut self, block_index: u64) -> io::Result<Option<ChangesetOffset>> {
|
||||
if block_index >= self.len {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let byte_pos = block_index * Self::RECORD_SIZE as u64;
|
||||
self.file.seek(SeekFrom::Start(byte_pos))?;
|
||||
|
||||
let mut buf = [0u8; Self::RECORD_SIZE];
|
||||
self.file.read_exact(&mut buf)?;
|
||||
|
||||
let offset = u64::from_le_bytes(buf[..8].try_into().unwrap());
|
||||
let num_changes = u64::from_le_bytes(buf[8..].try_into().unwrap());
|
||||
|
||||
Ok(Some(ChangesetOffset::new(offset, num_changes)))
|
||||
}
|
||||
|
||||
/// Reads a range of changeset offsets.
|
||||
pub fn get_range(&mut self, start: u64, end: u64) -> io::Result<Vec<ChangesetOffset>> {
|
||||
let end = end.min(self.len);
|
||||
if start >= end {
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
|
||||
let count = (end - start) as usize;
|
||||
let byte_pos = start * Self::RECORD_SIZE as u64;
|
||||
self.file.seek(SeekFrom::Start(byte_pos))?;
|
||||
|
||||
let mut result = Vec::with_capacity(count);
|
||||
let mut buf = [0u8; Self::RECORD_SIZE];
|
||||
|
||||
for _ in 0..count {
|
||||
self.file.read_exact(&mut buf)?;
|
||||
let offset = u64::from_le_bytes(buf[..8].try_into().unwrap());
|
||||
let num_changes = u64::from_le_bytes(buf[8..].try_into().unwrap());
|
||||
result.push(ChangesetOffset::new(offset, num_changes));
|
||||
}
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Returns the number of valid records.
|
||||
pub const fn len(&self) -> u64 {
|
||||
self.len
|
||||
}
|
||||
|
||||
/// Returns true if there are no records.
|
||||
pub const fn is_empty(&self) -> bool {
|
||||
self.len == 0
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use tempfile::tempdir;
|
||||
|
||||
#[test]
|
||||
fn test_write_and_read() {
|
||||
let dir = tempdir().unwrap();
|
||||
let path = dir.path().join("test.csoff");
|
||||
|
||||
// Write (new file, committed_len=0)
|
||||
{
|
||||
let mut writer = ChangesetOffsetWriter::new(&path, 0).unwrap();
|
||||
writer.append(&ChangesetOffset::new(0, 5)).unwrap();
|
||||
writer.append(&ChangesetOffset::new(5, 3)).unwrap();
|
||||
writer.append(&ChangesetOffset::new(8, 10)).unwrap();
|
||||
writer.sync().unwrap();
|
||||
assert_eq!(writer.len(), 3);
|
||||
}
|
||||
|
||||
// Read
|
||||
{
|
||||
let mut reader = ChangesetOffsetReader::new(&path, 3).unwrap();
|
||||
assert_eq!(reader.len(), 3);
|
||||
|
||||
let entry = reader.get(0).unwrap().unwrap();
|
||||
assert_eq!(entry.offset(), 0);
|
||||
assert_eq!(entry.num_changes(), 5);
|
||||
|
||||
let entry = reader.get(1).unwrap().unwrap();
|
||||
assert_eq!(entry.offset(), 5);
|
||||
assert_eq!(entry.num_changes(), 3);
|
||||
|
||||
let entry = reader.get(2).unwrap().unwrap();
|
||||
assert_eq!(entry.offset(), 8);
|
||||
assert_eq!(entry.num_changes(), 10);
|
||||
|
||||
assert!(reader.get(3).unwrap().is_none());
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_truncate() {
|
||||
let dir = tempdir().unwrap();
|
||||
let path = dir.path().join("test.csoff");
|
||||
|
||||
let mut writer = ChangesetOffsetWriter::new(&path, 0).unwrap();
|
||||
writer.append(&ChangesetOffset::new(0, 1)).unwrap();
|
||||
writer.append(&ChangesetOffset::new(1, 2)).unwrap();
|
||||
writer.append(&ChangesetOffset::new(3, 3)).unwrap();
|
||||
writer.sync().unwrap();
|
||||
|
||||
writer.truncate(2).unwrap();
|
||||
assert_eq!(writer.len(), 2);
|
||||
|
||||
let mut reader = ChangesetOffsetReader::new(&path, 2).unwrap();
|
||||
assert_eq!(reader.len(), 2);
|
||||
assert!(reader.get(2).unwrap().is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_partial_record_recovery() {
|
||||
let dir = tempdir().unwrap();
|
||||
let path = dir.path().join("test.csoff");
|
||||
|
||||
// Write 1 full record (16 bytes) + 8 trailing bytes (partial record)
|
||||
{
|
||||
let mut file = std::fs::File::create(&path).unwrap();
|
||||
// Full record: offset=100, num_changes=5
|
||||
file.write_all(&100u64.to_le_bytes()).unwrap();
|
||||
file.write_all(&5u64.to_le_bytes()).unwrap();
|
||||
// Partial record: only 8 bytes (incomplete)
|
||||
file.write_all(&200u64.to_le_bytes()).unwrap();
|
||||
file.sync_all().unwrap();
|
||||
}
|
||||
|
||||
// Verify file has 24 bytes before opening with writer
|
||||
assert_eq!(std::fs::metadata(&path).unwrap().len(), 24);
|
||||
|
||||
// Open with writer, committed_len=1 (header committed 1 record)
|
||||
// Should truncate partial record and match committed length
|
||||
let writer = ChangesetOffsetWriter::new(&path, 1).unwrap();
|
||||
assert_eq!(writer.len(), 1);
|
||||
|
||||
// Verify file was truncated to 16 bytes
|
||||
assert_eq!(std::fs::metadata(&path).unwrap().len(), 16);
|
||||
|
||||
// Verify the complete record is readable
|
||||
let mut reader = ChangesetOffsetReader::new(&path, 1).unwrap();
|
||||
assert_eq!(reader.len(), 1);
|
||||
let entry = reader.get(0).unwrap().unwrap();
|
||||
assert_eq!(entry.offset(), 100);
|
||||
assert_eq!(entry.num_changes(), 5);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_len_bounds_reads() {
|
||||
let dir = tempdir().unwrap();
|
||||
let path = dir.path().join("test.csoff");
|
||||
|
||||
// Write 3 records
|
||||
{
|
||||
let mut writer = ChangesetOffsetWriter::new(&path, 0).unwrap();
|
||||
writer.append(&ChangesetOffset::new(0, 10)).unwrap();
|
||||
writer.append(&ChangesetOffset::new(10, 20)).unwrap();
|
||||
writer.append(&ChangesetOffset::new(30, 30)).unwrap();
|
||||
writer.sync().unwrap();
|
||||
assert_eq!(writer.len(), 3);
|
||||
}
|
||||
|
||||
// Open with len=2, ignoring the 3rd record
|
||||
let mut reader = ChangesetOffsetReader::new(&path, 2).unwrap();
|
||||
assert_eq!(reader.len(), 2);
|
||||
|
||||
// First two records should be readable
|
||||
let entry0 = reader.get(0).unwrap().unwrap();
|
||||
assert_eq!(entry0.offset(), 0);
|
||||
assert_eq!(entry0.num_changes(), 10);
|
||||
|
||||
let entry1 = reader.get(1).unwrap().unwrap();
|
||||
assert_eq!(entry1.offset(), 10);
|
||||
assert_eq!(entry1.num_changes(), 20);
|
||||
|
||||
// Third record should be out of bounds (due to len=2)
|
||||
assert!(reader.get(2).unwrap().is_none());
|
||||
|
||||
// get_range should also respect the len bound
|
||||
let range = reader.get_range(0, 5).unwrap();
|
||||
assert_eq!(range.len(), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_truncate_uncommitted_records_on_open() {
|
||||
// Simulates crash recovery where sidecar has more records than committed header length.
|
||||
// ChangesetOffsetWriter::new() should automatically truncate to committed_len.
|
||||
let dir = tempdir().unwrap();
|
||||
let path = dir.path().join("test.csoff");
|
||||
|
||||
// Simulate: wrote 3 records, synced sidecar, but header only committed len=2
|
||||
{
|
||||
let mut writer = ChangesetOffsetWriter::new(&path, 0).unwrap();
|
||||
writer.append(&ChangesetOffset::new(0, 5)).unwrap();
|
||||
writer.append(&ChangesetOffset::new(5, 10)).unwrap();
|
||||
writer.append(&ChangesetOffset::new(15, 7)).unwrap(); // uncommitted
|
||||
writer.sync().unwrap();
|
||||
assert_eq!(writer.len(), 3);
|
||||
}
|
||||
|
||||
// On "restart", new() heals by truncating to committed length
|
||||
let committed_len = 2u64;
|
||||
{
|
||||
let writer = ChangesetOffsetWriter::new(&path, committed_len).unwrap();
|
||||
assert_eq!(writer.len(), 2); // Healed to committed length
|
||||
}
|
||||
|
||||
// Verify file is now correct length and new appends go to the right place
|
||||
{
|
||||
let mut writer = ChangesetOffsetWriter::new(&path, 2).unwrap();
|
||||
assert_eq!(writer.len(), 2);
|
||||
|
||||
// Append a new record - should be at index 2, not index 3
|
||||
writer.append(&ChangesetOffset::new(15, 20)).unwrap();
|
||||
writer.sync().unwrap();
|
||||
assert_eq!(writer.len(), 3);
|
||||
}
|
||||
|
||||
// Verify the records are correct
|
||||
{
|
||||
let mut reader = ChangesetOffsetReader::new(&path, 3).unwrap();
|
||||
assert_eq!(reader.len(), 3);
|
||||
|
||||
let entry0 = reader.get(0).unwrap().unwrap();
|
||||
assert_eq!(entry0.offset(), 0);
|
||||
assert_eq!(entry0.num_changes(), 5);
|
||||
|
||||
let entry1 = reader.get(1).unwrap().unwrap();
|
||||
assert_eq!(entry1.offset(), 5);
|
||||
assert_eq!(entry1.num_changes(), 10);
|
||||
|
||||
// This should be the NEW record, not the old uncommitted one
|
||||
let entry2 = reader.get(2).unwrap().unwrap();
|
||||
assert_eq!(entry2.offset(), 15);
|
||||
assert_eq!(entry2.num_changes(), 20); // Not 7 from the old uncommitted record
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_sidecar_shorter_than_committed_errors() {
|
||||
// If sidecar has fewer records than committed, it's data corruption - should error.
|
||||
let dir = tempdir().unwrap();
|
||||
let path = dir.path().join("test.csoff");
|
||||
|
||||
// Write 1 record
|
||||
{
|
||||
let mut writer = ChangesetOffsetWriter::new(&path, 0).unwrap();
|
||||
writer.append(&ChangesetOffset::new(0, 5)).unwrap();
|
||||
writer.sync().unwrap();
|
||||
}
|
||||
|
||||
// Try to open with committed_len=3 (header claims more than file has)
|
||||
let result = ChangesetOffsetWriter::new(&path, 3);
|
||||
assert!(result.is_err());
|
||||
let err = result.unwrap_err();
|
||||
assert_eq!(err.kind(), io::ErrorKind::InvalidData);
|
||||
}
|
||||
}
|
||||
@@ -15,11 +15,18 @@ mod compression;
|
||||
mod event;
|
||||
mod segment;
|
||||
|
||||
#[cfg(feature = "std")]
|
||||
mod changeset_offsets;
|
||||
#[cfg(feature = "std")]
|
||||
pub use changeset_offsets::{ChangesetOffsetReader, ChangesetOffsetWriter};
|
||||
|
||||
use alloy_primitives::BlockNumber;
|
||||
pub use compression::Compression;
|
||||
use core::ops::RangeInclusive;
|
||||
pub use event::StaticFileProducerEvent;
|
||||
pub use segment::{SegmentConfig, SegmentHeader, SegmentRangeInclusive, StaticFileSegment};
|
||||
pub use segment::{
|
||||
ChangesetOffset, SegmentConfig, SegmentHeader, SegmentRangeInclusive, StaticFileSegment,
|
||||
};
|
||||
|
||||
/// Map keyed by [`StaticFileSegment`].
|
||||
pub type StaticFileMap<T> = alloc::boxed::Box<fixed_map::Map<StaticFileSegment, T>>;
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use crate::{find_fixed_range, BlockNumber, Compression};
|
||||
use alloc::{format, string::String, vec::Vec};
|
||||
use alloc::{format, string::String};
|
||||
use alloy_primitives::TxNumber;
|
||||
use core::{
|
||||
ops::{Range, RangeInclusive},
|
||||
@@ -238,6 +238,11 @@ pub struct ChangesetOffset {
|
||||
}
|
||||
|
||||
impl ChangesetOffset {
|
||||
/// Creates a new changeset offset.
|
||||
pub const fn new(offset: u64, num_changes: u64) -> Self {
|
||||
Self { offset, num_changes }
|
||||
}
|
||||
|
||||
/// Returns the start offset for the row for this block
|
||||
pub const fn offset(&self) -> u64 {
|
||||
self.offset
|
||||
@@ -252,6 +257,11 @@ impl ChangesetOffset {
|
||||
pub const fn changeset_range(&self) -> Range<u64> {
|
||||
self.offset..(self.offset + self.num_changes)
|
||||
}
|
||||
|
||||
/// Increments the number of changes by 1.
|
||||
pub const fn increment_num_changes(&mut self) {
|
||||
self.num_changes += 1;
|
||||
}
|
||||
}
|
||||
|
||||
/// A segment header that contains information common to all segments. Used for storage.
|
||||
@@ -269,7 +279,7 @@ pub struct SegmentHeader {
|
||||
/// Segment type
|
||||
segment: StaticFileSegment,
|
||||
/// List of offsets, for where each block's changeset starts.
|
||||
changeset_offsets: Option<Vec<ChangesetOffset>>,
|
||||
changeset_offsets_len: u64,
|
||||
}
|
||||
|
||||
struct SegmentHeaderVisitor;
|
||||
@@ -298,21 +308,18 @@ impl<'de> Visitor<'de> for SegmentHeaderVisitor {
|
||||
let segment: StaticFileSegment =
|
||||
seq.next_element()?.ok_or_else(|| serde::de::Error::invalid_length(3, &self))?;
|
||||
|
||||
let changeset_offsets = if segment.is_change_based() {
|
||||
// Try to read the 5th field (changeset_offsets)
|
||||
// If it doesn't exist (old format), this will return None
|
||||
match seq.next_element()? {
|
||||
Some(Some(offsets)) => Some(offsets),
|
||||
// Changesets should have offsets
|
||||
Some(None) => None,
|
||||
let changeset_offsets_len = if segment.is_change_based() {
|
||||
// Try to read the 5th field (changeset_offsets_len)
|
||||
match seq.next_element::<u64>()? {
|
||||
Some(len) => len,
|
||||
None => {
|
||||
return Err(serde::de::Error::custom(
|
||||
"changeset_offsets should exist for static files",
|
||||
"changeset_offsets_len should exist for changeset static files",
|
||||
))
|
||||
}
|
||||
}
|
||||
} else {
|
||||
None
|
||||
0
|
||||
};
|
||||
|
||||
Ok(SegmentHeader {
|
||||
@@ -320,7 +327,7 @@ impl<'de> Visitor<'de> for SegmentHeaderVisitor {
|
||||
block_range,
|
||||
tx_range,
|
||||
segment,
|
||||
changeset_offsets,
|
||||
changeset_offsets_len,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -355,7 +362,7 @@ impl Serialize for SegmentHeader {
|
||||
state.serialize_field("segment", &self.segment)?;
|
||||
|
||||
if self.segment.is_change_based() {
|
||||
state.serialize_field("changeset_offsets", &self.changeset_offsets)?;
|
||||
state.serialize_field("changeset_offsets", &self.changeset_offsets_len)?;
|
||||
}
|
||||
|
||||
state.end()
|
||||
@@ -370,7 +377,7 @@ impl SegmentHeader {
|
||||
tx_range: Option<SegmentRangeInclusive>,
|
||||
segment: StaticFileSegment,
|
||||
) -> Self {
|
||||
Self { expected_block_range, block_range, tx_range, segment, changeset_offsets: None }
|
||||
Self { expected_block_range, block_range, tx_range, segment, changeset_offsets_len: 0 }
|
||||
}
|
||||
|
||||
/// Returns the static file segment kind.
|
||||
@@ -393,9 +400,20 @@ impl SegmentHeader {
|
||||
self.tx_range
|
||||
}
|
||||
|
||||
/// Returns the changeset offsets.
|
||||
pub const fn changeset_offsets(&self) -> Option<&Vec<ChangesetOffset>> {
|
||||
self.changeset_offsets.as_ref()
|
||||
/// Returns the number of changeset offset entries.
|
||||
/// The actual offsets are stored in the `.csoff` sidecar file.
|
||||
pub const fn changeset_offsets_len(&self) -> u64 {
|
||||
self.changeset_offsets_len
|
||||
}
|
||||
|
||||
/// Sets the changeset offsets length.
|
||||
pub const fn set_changeset_offsets_len(&mut self, len: u64) {
|
||||
self.changeset_offsets_len = len;
|
||||
}
|
||||
|
||||
/// Increments the changeset offsets length by 1.
|
||||
pub const fn increment_changeset_offsets_len(&mut self) {
|
||||
self.changeset_offsets_len += 1;
|
||||
}
|
||||
|
||||
/// The expected block start of the segment.
|
||||
@@ -453,7 +471,7 @@ impl SegmentHeader {
|
||||
}
|
||||
|
||||
/// Increments block end range depending on segment
|
||||
pub fn increment_block(&mut self) -> BlockNumber {
|
||||
pub const fn increment_block(&mut self) -> BlockNumber {
|
||||
let block_num = if let Some(block_range) = &mut self.block_range {
|
||||
block_range.end += 1;
|
||||
block_range.end
|
||||
@@ -465,20 +483,10 @@ impl SegmentHeader {
|
||||
self.expected_block_start()
|
||||
};
|
||||
|
||||
// For changeset segments, initialize an offset entry for the new block
|
||||
// For changeset segments, increment the offsets length.
|
||||
// The actual offset entry is written to the sidecar file by the caller.
|
||||
if self.segment.is_change_based() {
|
||||
let offsets = self.changeset_offsets.get_or_insert_default();
|
||||
// Calculate the offset for the new block
|
||||
let new_offset = if let Some(last_offset) = offsets.last() {
|
||||
// The new block starts after the last block's changes
|
||||
last_offset.offset + last_offset.num_changes
|
||||
} else {
|
||||
// First block starts at offset 0
|
||||
0
|
||||
};
|
||||
|
||||
// Add a new offset entry with 0 changes initially
|
||||
offsets.push(ChangesetOffset { offset: new_offset, num_changes: 0 });
|
||||
self.changeset_offsets_len += 1;
|
||||
}
|
||||
|
||||
block_num
|
||||
@@ -495,23 +503,11 @@ impl SegmentHeader {
|
||||
}
|
||||
}
|
||||
|
||||
/// Increments the latest block's number of changes.
|
||||
pub fn increment_block_changes(&mut self) {
|
||||
debug_assert!(self.segment().is_change_based());
|
||||
if self.segment.is_change_based() {
|
||||
let offsets = self.changeset_offsets.get_or_insert_with(Default::default);
|
||||
if let Some(last_offset) = offsets.last_mut() {
|
||||
last_offset.num_changes += 1;
|
||||
} else {
|
||||
// If offsets is empty, we are adding the first change for a block
|
||||
// The offset for the first block is 0
|
||||
offsets.push(ChangesetOffset { offset: 0, num_changes: 1 });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Removes `num` elements from end of tx or block range.
|
||||
pub fn prune(&mut self, num: u64) {
|
||||
///
|
||||
/// For changeset segments, also decrements the changeset offsets length.
|
||||
/// The caller must truncate the sidecar file accordingly.
|
||||
pub const fn prune(&mut self, num: u64) {
|
||||
// Changesets also contain a block range, but are not strictly block-based
|
||||
if self.segment.is_block_or_change_based() {
|
||||
if let Some(range) = &mut self.block_range {
|
||||
@@ -519,26 +515,18 @@ impl SegmentHeader {
|
||||
self.block_range = None;
|
||||
// Clear all changeset offsets if we're clearing all blocks
|
||||
if self.segment.is_change_based() {
|
||||
self.changeset_offsets = None;
|
||||
self.changeset_offsets_len = 0;
|
||||
}
|
||||
} else {
|
||||
let old_end = range.end;
|
||||
range.end = range.end.saturating_sub(num);
|
||||
|
||||
// Update changeset offsets for account changesets
|
||||
if self.segment.is_change_based() &&
|
||||
let Some(offsets) = &mut self.changeset_offsets
|
||||
{
|
||||
// Update changeset offsets length for changeset segments
|
||||
if self.segment.is_change_based() {
|
||||
// Calculate how many blocks we're removing
|
||||
let blocks_to_remove = old_end - range.end;
|
||||
// Remove the last `blocks_to_remove` entries from offsets
|
||||
let new_len = offsets.len().saturating_sub(blocks_to_remove as usize);
|
||||
offsets.truncate(new_len);
|
||||
|
||||
// If we removed all offsets, set to None
|
||||
if offsets.is_empty() {
|
||||
self.changeset_offsets = None;
|
||||
}
|
||||
self.changeset_offsets_len =
|
||||
self.changeset_offsets_len.saturating_sub(blocks_to_remove);
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -561,28 +549,24 @@ impl SegmentHeader {
|
||||
}
|
||||
}
|
||||
|
||||
/// Synchronizes changeset offsets with the current block range for account changeset segments.
|
||||
/// Synchronizes changeset offsets length with the current block range for changeset segments.
|
||||
///
|
||||
/// This should be called after modifying the block range when dealing with changeset segments
|
||||
/// to ensure the offsets vector matches the block range size.
|
||||
pub fn sync_changeset_offsets(&mut self) {
|
||||
/// to ensure the offsets length matches the block range size.
|
||||
/// The caller must also truncate the sidecar file accordingly.
|
||||
pub const fn sync_changeset_offsets(&mut self) {
|
||||
if !self.segment.is_change_based() {
|
||||
return;
|
||||
}
|
||||
|
||||
if let Some(block_range) = &self.block_range {
|
||||
if let Some(offsets) = &mut self.changeset_offsets {
|
||||
let expected_len = (block_range.end - block_range.start + 1) as usize;
|
||||
if offsets.len() > expected_len {
|
||||
offsets.truncate(expected_len);
|
||||
if offsets.is_empty() {
|
||||
self.changeset_offsets = None;
|
||||
}
|
||||
}
|
||||
let expected_len = block_range.end - block_range.start + 1;
|
||||
if self.changeset_offsets_len > expected_len {
|
||||
self.changeset_offsets_len = expected_len;
|
||||
}
|
||||
} else {
|
||||
// No block range means no offsets
|
||||
self.changeset_offsets = None;
|
||||
self.changeset_offsets_len = 0;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -608,21 +592,23 @@ impl SegmentHeader {
|
||||
self.tx_start()
|
||||
}
|
||||
|
||||
/// Returns the `ChangesetOffset` corresponding for the given block, if it's in the block
|
||||
/// range.
|
||||
/// Returns the index into the sidecar file for a given block's changeset offset.
|
||||
///
|
||||
/// If it is not in the block range or the changeset list in the header does not contain a
|
||||
/// value for the block, this returns `None`.
|
||||
pub fn changeset_offset(&self, block: BlockNumber) -> Option<&ChangesetOffset> {
|
||||
/// Returns `None` if the block is not in the block range.
|
||||
/// To get the changeset offset, the caller must read the offset from the sidecar file at this
|
||||
/// index.
|
||||
pub fn changeset_offset_index(&self, block: BlockNumber) -> Option<u64> {
|
||||
let block_range = self.block_range()?;
|
||||
if !block_range.contains(block) {
|
||||
return None
|
||||
return None;
|
||||
}
|
||||
|
||||
let offsets = self.changeset_offsets.as_ref()?;
|
||||
let index = (block - block_range.start()) as usize;
|
||||
let index = block - block_range.start();
|
||||
if index >= self.changeset_offsets_len {
|
||||
return None;
|
||||
}
|
||||
|
||||
offsets.get(index)
|
||||
Some(index)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -781,42 +767,42 @@ mod tests {
|
||||
block_range: Some(SegmentRangeInclusive::new(0, 100)),
|
||||
tx_range: None,
|
||||
segment: StaticFileSegment::Headers,
|
||||
changeset_offsets: None,
|
||||
changeset_offsets_len: 0,
|
||||
},
|
||||
SegmentHeader {
|
||||
expected_block_range: SegmentRangeInclusive::new(0, 200),
|
||||
block_range: None,
|
||||
tx_range: Some(SegmentRangeInclusive::new(0, 300)),
|
||||
segment: StaticFileSegment::Transactions,
|
||||
changeset_offsets: None,
|
||||
changeset_offsets_len: 0,
|
||||
},
|
||||
SegmentHeader {
|
||||
expected_block_range: SegmentRangeInclusive::new(0, 200),
|
||||
block_range: Some(SegmentRangeInclusive::new(0, 100)),
|
||||
tx_range: Some(SegmentRangeInclusive::new(0, 300)),
|
||||
segment: StaticFileSegment::Receipts,
|
||||
changeset_offsets: None,
|
||||
changeset_offsets_len: 0,
|
||||
},
|
||||
SegmentHeader {
|
||||
expected_block_range: SegmentRangeInclusive::new(0, 200),
|
||||
block_range: Some(SegmentRangeInclusive::new(0, 100)),
|
||||
tx_range: Some(SegmentRangeInclusive::new(0, 300)),
|
||||
segment: StaticFileSegment::TransactionSenders,
|
||||
changeset_offsets: None,
|
||||
changeset_offsets_len: 0,
|
||||
},
|
||||
SegmentHeader {
|
||||
expected_block_range: SegmentRangeInclusive::new(0, 200),
|
||||
block_range: Some(SegmentRangeInclusive::new(0, 100)),
|
||||
tx_range: Some(SegmentRangeInclusive::new(0, 300)),
|
||||
segment: StaticFileSegment::AccountChangeSets,
|
||||
changeset_offsets: Some(vec![ChangesetOffset { offset: 1, num_changes: 1 }; 100]),
|
||||
changeset_offsets_len: 100,
|
||||
},
|
||||
SegmentHeader {
|
||||
expected_block_range: SegmentRangeInclusive::new(0, 200),
|
||||
block_range: Some(SegmentRangeInclusive::new(0, 100)),
|
||||
tx_range: None,
|
||||
segment: StaticFileSegment::StorageChangeSets,
|
||||
changeset_offsets: Some(vec![ChangesetOffset { offset: 1, num_changes: 1 }; 100]),
|
||||
changeset_offsets_len: 100,
|
||||
},
|
||||
];
|
||||
// Check that we test all segments
|
||||
|
||||
@@ -2,4 +2,4 @@
|
||||
source: crates/static-file/types/src/segment.rs
|
||||
expression: "Bytes::from(serialized)"
|
||||
---
|
||||
0x01000000000000000000000000000000c80000000000000001000000000000000064000000000000000100000000000000002c01000000000000040000000164000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000000000000000000000000000000000000
|
||||
0x01000000000000000000000000000000c80000000000000001000000000000000064000000000000000100000000000000002c0100000000000004000000640000000000000001000000000000000000000000000000000000000000000000
|
||||
|
||||
@@ -2,4 +2,4 @@
|
||||
source: crates/static-file/types/src/segment.rs
|
||||
expression: "Bytes::from(serialized)"
|
||||
---
|
||||
0x01000000000000000000000000000000c800000000000000010000000000000000640000000000000000050000000164000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000100000000000000010000000000000001000000000000000000000000000000000000000000000000
|
||||
0x01000000000000000000000000000000c80000000000000001000000000000000064000000000000000005000000640000000000000001000000000000000000000000000000000000000000000000
|
||||
|
||||
@@ -59,6 +59,8 @@ const INDEX_FILE_EXTENSION: &str = "idx";
|
||||
const OFFSETS_FILE_EXTENSION: &str = "off";
|
||||
/// The file extension used for configuration files.
|
||||
pub const CONFIG_FILE_EXTENSION: &str = "conf";
|
||||
/// The file extension used for changeset offset sidecar files.
|
||||
pub const CHANGESET_OFFSETS_FILE_EXTENSION: &str = "csoff";
|
||||
|
||||
/// A [`RefRow`] is a list of column value slices pointing to either an internal buffer or a
|
||||
/// memory-mapped file.
|
||||
@@ -240,13 +242,22 @@ impl<H: NippyJarHeader> NippyJar<H> {
|
||||
self.path.with_extension(CONFIG_FILE_EXTENSION)
|
||||
}
|
||||
|
||||
/// Returns the path for the changeset offsets sidecar file.
|
||||
pub fn changeset_offsets_path(&self) -> PathBuf {
|
||||
self.path.with_extension(CHANGESET_OFFSETS_FILE_EXTENSION)
|
||||
}
|
||||
|
||||
/// Deletes from disk this [`NippyJar`] alongside every satellite file.
|
||||
pub fn delete(self) -> Result<(), NippyJarError> {
|
||||
// TODO(joshie): ensure consistency on unexpected shutdown
|
||||
|
||||
for path in
|
||||
[self.data_path().into(), self.index_path(), self.offsets_path(), self.config_path()]
|
||||
{
|
||||
for path in [
|
||||
self.data_path().into(),
|
||||
self.index_path(),
|
||||
self.offsets_path(),
|
||||
self.config_path(),
|
||||
self.changeset_offsets_path(),
|
||||
] {
|
||||
if path.exists() {
|
||||
debug!(target: "nippy-jar", ?path, "Removing file.");
|
||||
reth_fs_util::remove_file(path)?;
|
||||
|
||||
@@ -30,8 +30,10 @@ reth-nippy-jar.workspace = true
|
||||
reth-codecs.workspace = true
|
||||
reth-chain-state.workspace = true
|
||||
reth-node-types.workspace = true
|
||||
reth-static-file-types.workspace = true
|
||||
reth-static-file-types = { workspace = true, features = ["std"] }
|
||||
reth-fs-util.workspace = true
|
||||
reth-tasks.workspace = true
|
||||
|
||||
# ethereum
|
||||
alloy-eips.workspace = true
|
||||
alloy-primitives.workspace = true
|
||||
|
||||
@@ -17,6 +17,7 @@ use reth_db::static_file::{
|
||||
use reth_db_api::table::{Decompress, Value};
|
||||
use reth_node_types::NodePrimitives;
|
||||
use reth_primitives_traits::{SealedHeader, SignedTransaction};
|
||||
use reth_static_file_types::{ChangesetOffset, ChangesetOffsetReader};
|
||||
use reth_storage_api::range_size_hint;
|
||||
use reth_storage_errors::provider::{ProviderError, ProviderResult};
|
||||
use std::{
|
||||
@@ -90,6 +91,63 @@ impl<'a, N: NodePrimitives> StaticFileJarProvider<'a, N> {
|
||||
pub fn size(&self) -> usize {
|
||||
self.jar.value().size()
|
||||
}
|
||||
|
||||
/// Reads a changeset offset from the sidecar file for a given block.
|
||||
///
|
||||
/// Returns `None` if:
|
||||
/// - The segment is not change-based
|
||||
/// - The block is not in the block range
|
||||
/// - The sidecar file doesn't exist
|
||||
pub fn read_changeset_offset(
|
||||
&self,
|
||||
block_number: BlockNumber,
|
||||
) -> ProviderResult<Option<ChangesetOffset>> {
|
||||
let header = self.user_header();
|
||||
if !header.segment().is_change_based() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let Some(index) = header.changeset_offset_index(block_number) else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let csoff_path = self.data_path().with_extension("csoff");
|
||||
if !csoff_path.exists() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let len = header.changeset_offsets_len();
|
||||
let mut reader =
|
||||
ChangesetOffsetReader::new(&csoff_path, len).map_err(ProviderError::other)?;
|
||||
reader.get(index).map_err(ProviderError::other)
|
||||
}
|
||||
|
||||
/// Reads all changeset offsets from the sidecar file.
|
||||
///
|
||||
/// Returns `None` if:
|
||||
/// - The segment is not change-based
|
||||
/// - The sidecar file doesn't exist
|
||||
pub fn read_changeset_offsets(&self) -> ProviderResult<Option<Vec<ChangesetOffset>>> {
|
||||
let header = self.user_header();
|
||||
if !header.segment().is_change_based() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let len = header.changeset_offsets_len();
|
||||
if len == 0 {
|
||||
return Ok(Some(Vec::new()));
|
||||
}
|
||||
|
||||
let csoff_path = self.data_path().with_extension("csoff");
|
||||
if !csoff_path.exists() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let mut reader =
|
||||
ChangesetOffsetReader::new(&csoff_path, len).map_err(ProviderError::other)?;
|
||||
let offsets = reader.get_range(0, len).map_err(ProviderError::other)?;
|
||||
Ok(Some(offsets))
|
||||
}
|
||||
}
|
||||
|
||||
impl<N: NodePrimitives<BlockHeader: Value>> HeaderProvider for StaticFileJarProvider<'_, N> {
|
||||
|
||||
@@ -39,8 +39,8 @@ use reth_primitives_traits::{
|
||||
use reth_prune_types::PruneSegment;
|
||||
use reth_stages_types::PipelineTarget;
|
||||
use reth_static_file_types::{
|
||||
find_fixed_range, HighestStaticFiles, SegmentHeader, SegmentRangeInclusive, StaticFileMap,
|
||||
StaticFileSegment, DEFAULT_BLOCKS_PER_STATIC_FILE,
|
||||
find_fixed_range, ChangesetOffsetReader, HighestStaticFiles, SegmentHeader,
|
||||
SegmentRangeInclusive, StaticFileMap, StaticFileSegment, DEFAULT_BLOCKS_PER_STATIC_FILE,
|
||||
};
|
||||
use reth_storage_api::{
|
||||
BlockBodyIndicesProvider, ChangeSetReader, DBProvider, PruneCheckpointReader,
|
||||
@@ -962,10 +962,10 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
) -> ProviderResult<SegmentHeader> {
|
||||
let fixed_block_range = self.find_fixed_range(segment, block);
|
||||
let key = (fixed_block_range.end(), segment);
|
||||
let file = self.path.join(segment.filename(&fixed_block_range));
|
||||
let jar = if let Some((_, jar)) = self.map.remove(&key) {
|
||||
jar.jar
|
||||
} else {
|
||||
let file = self.path.join(segment.filename(&fixed_block_range));
|
||||
debug!(
|
||||
target: "providers::static_file",
|
||||
?file,
|
||||
@@ -977,6 +977,15 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
};
|
||||
|
||||
let header = jar.user_header().clone();
|
||||
|
||||
// Delete the sidecar file for changeset segments before deleting the main jar
|
||||
if segment.is_change_based() {
|
||||
let csoff_path = file.with_extension("csoff");
|
||||
if csoff_path.exists() {
|
||||
std::fs::remove_file(&csoff_path).map_err(ProviderError::other)?;
|
||||
}
|
||||
}
|
||||
|
||||
jar.delete().map_err(ProviderError::other)?;
|
||||
|
||||
// SAFETY: this is currently necessary to ensure that certain indexes like
|
||||
@@ -2380,7 +2389,7 @@ impl<N: NodePrimitives> ChangeSetReader for StaticFileProvider<N> {
|
||||
Err(err) => return Err(err),
|
||||
};
|
||||
|
||||
if let Some(offset) = provider.user_header().changeset_offset(block_number) {
|
||||
if let Some(offset) = provider.read_changeset_offset(block_number)? {
|
||||
let mut cursor = provider.cursor()?;
|
||||
let mut changeset = Vec::with_capacity(offset.num_changes() as usize);
|
||||
|
||||
@@ -2412,9 +2421,7 @@ impl<N: NodePrimitives> ChangeSetReader for StaticFileProvider<N> {
|
||||
Err(err) => return Err(err),
|
||||
};
|
||||
|
||||
let user_header = provider.user_header();
|
||||
|
||||
let Some(offset) = user_header.changeset_offset(block_number) else {
|
||||
let Some(offset) = provider.read_changeset_offset(block_number)? else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
@@ -2473,12 +2480,19 @@ impl<N: NodePrimitives> ChangeSetReader for StaticFileProvider<N> {
|
||||
fn account_changeset_count(&self) -> ProviderResult<usize> {
|
||||
let mut count = 0;
|
||||
|
||||
// iterate through static files and sum changeset metadata via each static file header
|
||||
let static_files = iter_static_files(&self.path).map_err(ProviderError::other)?;
|
||||
if let Some(changeset_segments) = static_files.get(StaticFileSegment::AccountChangeSets) {
|
||||
for (_, header) in changeset_segments {
|
||||
if let Some(changeset_offsets) = header.changeset_offsets() {
|
||||
for offset in changeset_offsets {
|
||||
for (block_range, header) in changeset_segments {
|
||||
let csoff_path = self
|
||||
.path
|
||||
.join(StaticFileSegment::AccountChangeSets.filename(block_range))
|
||||
.with_extension("csoff");
|
||||
if csoff_path.exists() {
|
||||
let len = header.changeset_offsets_len();
|
||||
let mut reader = ChangesetOffsetReader::new(&csoff_path, len)
|
||||
.map_err(ProviderError::other)?;
|
||||
let offsets = reader.get_range(0, len).map_err(ProviderError::other)?;
|
||||
for offset in offsets {
|
||||
count += offset.num_changes() as usize;
|
||||
}
|
||||
}
|
||||
@@ -2504,7 +2518,7 @@ impl<N: NodePrimitives> StorageChangeSetReader for StaticFileProvider<N> {
|
||||
Err(err) => return Err(err),
|
||||
};
|
||||
|
||||
if let Some(offset) = provider.user_header().changeset_offset(block_number) {
|
||||
if let Some(offset) = provider.read_changeset_offset(block_number)? {
|
||||
let mut cursor = provider.cursor()?;
|
||||
let mut changeset = Vec::with_capacity(offset.num_changes() as usize);
|
||||
|
||||
@@ -2537,8 +2551,7 @@ impl<N: NodePrimitives> StorageChangeSetReader for StaticFileProvider<N> {
|
||||
Err(err) => return Err(err),
|
||||
};
|
||||
|
||||
let user_header = provider.user_header();
|
||||
let Some(offset) = user_header.changeset_offset(block_number) else {
|
||||
let Some(offset) = provider.read_changeset_offset(block_number)? else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
@@ -2595,9 +2608,17 @@ impl<N: NodePrimitives> StorageChangeSetReader for StaticFileProvider<N> {
|
||||
|
||||
let static_files = iter_static_files(&self.path).map_err(ProviderError::other)?;
|
||||
if let Some(changeset_segments) = static_files.get(StaticFileSegment::StorageChangeSets) {
|
||||
for (_, header) in changeset_segments {
|
||||
if let Some(changeset_offsets) = header.changeset_offsets() {
|
||||
for offset in changeset_offsets {
|
||||
for (block_range, header) in changeset_segments {
|
||||
let csoff_path = self
|
||||
.path
|
||||
.join(StaticFileSegment::StorageChangeSets.filename(block_range))
|
||||
.with_extension("csoff");
|
||||
if csoff_path.exists() {
|
||||
let len = header.changeset_offsets_len();
|
||||
let mut reader = ChangesetOffsetReader::new(&csoff_path, len)
|
||||
.map_err(ProviderError::other)?;
|
||||
let offsets = reader.get_range(0, len).map_err(ProviderError::other)?;
|
||||
for offset in offsets {
|
||||
count += offset.num_changes() as usize;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,6 +11,10 @@ mod writer;
|
||||
pub use writer::{StaticFileProviderRW, StaticFileProviderRWRefMut};
|
||||
|
||||
mod metrics;
|
||||
|
||||
#[cfg(test)]
|
||||
mod writer_tests;
|
||||
|
||||
use reth_nippy_jar::NippyJar;
|
||||
use reth_static_file_types::{SegmentHeader, StaticFileSegment};
|
||||
use reth_storage_errors::provider::{ProviderError, ProviderResult};
|
||||
@@ -744,8 +748,9 @@ mod tests {
|
||||
.unwrap();
|
||||
|
||||
// Check that the segment header has changeset offsets
|
||||
assert!(provider.user_header().changeset_offsets().is_some());
|
||||
let offsets = provider.user_header().changeset_offsets().unwrap();
|
||||
let offsets = provider.read_changeset_offsets().unwrap();
|
||||
assert!(offsets.is_some());
|
||||
let offsets = offsets.unwrap();
|
||||
assert_eq!(offsets.len(), 10); // Should have 10 blocks worth of offsets
|
||||
|
||||
// Verify each block has the expected number of changes
|
||||
@@ -855,7 +860,8 @@ mod tests {
|
||||
let (static_dir, _) = create_test_static_files_dir();
|
||||
|
||||
let blocks_per_file = 10;
|
||||
let files_per_range = 3;
|
||||
// 3 main files (jar, dat, idx) + 1 csoff sidecar file for changeset segments
|
||||
let files_per_range = 4;
|
||||
let file_set_count = 3;
|
||||
let initial_file_count = files_per_range * file_set_count;
|
||||
let tip = blocks_per_file * file_set_count - 1;
|
||||
@@ -924,7 +930,7 @@ mod tests {
|
||||
)?;
|
||||
|
||||
// Check offsets are valid
|
||||
let offsets = provider.user_header().changeset_offsets();
|
||||
let offsets = provider.read_changeset_offsets().unwrap();
|
||||
assert!(offsets.is_some(), "Should have changeset offsets");
|
||||
}
|
||||
|
||||
@@ -1088,8 +1094,9 @@ mod tests {
|
||||
.unwrap();
|
||||
|
||||
// Check that the segment header has changeset offsets
|
||||
assert!(provider.user_header().changeset_offsets().is_some());
|
||||
let offsets = provider.user_header().changeset_offsets().unwrap();
|
||||
let offsets = provider.read_changeset_offsets().unwrap();
|
||||
assert!(offsets.is_some());
|
||||
let offsets = offsets.unwrap();
|
||||
assert_eq!(offsets.len(), 10); // Should have 10 blocks worth of offsets
|
||||
|
||||
// Verify each block has the expected number of changes
|
||||
@@ -1190,7 +1197,8 @@ mod tests {
|
||||
let (static_dir, _) = create_test_static_files_dir();
|
||||
|
||||
let blocks_per_file = 10;
|
||||
let files_per_range = 3;
|
||||
// 3 main files (jar, dat, idx) + 1 csoff sidecar file for changeset segments
|
||||
let files_per_range = 4;
|
||||
let file_set_count = 3;
|
||||
let initial_file_count = files_per_range * file_set_count;
|
||||
let tip = blocks_per_file * file_set_count - 1;
|
||||
@@ -1249,7 +1257,7 @@ mod tests {
|
||||
tip,
|
||||
None,
|
||||
)?;
|
||||
let offsets = provider.user_header().changeset_offsets();
|
||||
let offsets = provider.read_changeset_offsets()?;
|
||||
assert!(offsets.is_some(), "Should have changeset offsets");
|
||||
}
|
||||
|
||||
@@ -1352,4 +1360,42 @@ mod tests {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_last_block_flushed_on_commit() {
|
||||
let (static_dir, _) = create_test_static_files_dir();
|
||||
|
||||
let sf_rw = StaticFileProvider::<EthPrimitives>::read_write(&static_dir)
|
||||
.expect("Failed to create static file provider");
|
||||
|
||||
let address = Address::from([5u8; 20]);
|
||||
let key = B256::with_last_byte(1);
|
||||
|
||||
// Write changes for a single block without calling increment_block explicitly
|
||||
// (append_storage_changeset calls it internally), then commit
|
||||
{
|
||||
let mut writer = sf_rw.latest_writer(StaticFileSegment::StorageChangeSets).unwrap();
|
||||
|
||||
// Append a single block's changeset (block 0)
|
||||
writer
|
||||
.append_storage_changeset(
|
||||
vec![StorageBeforeTx { address, key, value: U256::from(42) }],
|
||||
0,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// Commit without any subsequent block - the current block's offset should be flushed
|
||||
writer.commit().unwrap();
|
||||
}
|
||||
|
||||
// Verify highest block is 0
|
||||
let highest = sf_rw.get_highest_static_file_block(StaticFileSegment::StorageChangeSets);
|
||||
assert_eq!(highest, Some(0), "Should have block 0 after commit");
|
||||
|
||||
// Verify the data is actually readable via the high-level API
|
||||
let result = sf_rw.get_storage_before_block(0, address, key).unwrap();
|
||||
assert!(result.is_some(), "Should be able to read the changeset entry");
|
||||
let entry = result.unwrap();
|
||||
assert_eq!(entry.value, U256::from(42));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,7 +10,10 @@ use reth_db::models::{AccountBeforeTx, StorageBeforeTx};
|
||||
use reth_db_api::models::CompactU256;
|
||||
use reth_nippy_jar::{NippyJar, NippyJarError, NippyJarWriter};
|
||||
use reth_node_types::NodePrimitives;
|
||||
use reth_static_file_types::{SegmentHeader, SegmentRangeInclusive, StaticFileSegment};
|
||||
use reth_static_file_types::{
|
||||
ChangesetOffset, ChangesetOffsetReader, ChangesetOffsetWriter, SegmentHeader,
|
||||
SegmentRangeInclusive, StaticFileSegment,
|
||||
};
|
||||
use reth_storage_errors::provider::{ProviderError, ProviderResult, StaticFileWriterError};
|
||||
use std::{
|
||||
borrow::Borrow,
|
||||
@@ -232,6 +235,10 @@ pub struct StaticFileProviderRW<N> {
|
||||
prune_on_commit: Option<PruneStrategy>,
|
||||
/// Whether `sync_all()` has been called. Used by `finalize()` to avoid redundant syncs.
|
||||
synced: bool,
|
||||
/// Changeset offsets sidecar writer (only for changeset segments).
|
||||
changeset_offsets: Option<ChangesetOffsetWriter>,
|
||||
/// Current block's changeset offset being written.
|
||||
current_changeset_offset: Option<ChangesetOffset>,
|
||||
}
|
||||
|
||||
impl<N: NodePrimitives> StaticFileProviderRW<N> {
|
||||
@@ -246,6 +253,8 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
|
||||
metrics: Option<Arc<StaticFileProviderMetrics>>,
|
||||
) -> ProviderResult<Self> {
|
||||
let (writer, data_path) = Self::open(segment, block, reader.clone(), metrics.clone())?;
|
||||
|
||||
// Create writer WITHOUT sidecar first - we'll add it after healing
|
||||
let mut writer = Self {
|
||||
writer,
|
||||
data_path,
|
||||
@@ -254,10 +263,19 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
|
||||
metrics,
|
||||
prune_on_commit: None,
|
||||
synced: false,
|
||||
changeset_offsets: None,
|
||||
current_changeset_offset: None,
|
||||
};
|
||||
|
||||
// Run NippyJar healing BEFORE setting up changeset sidecar
|
||||
// This may reduce rows, which affects valid sidecar offsets
|
||||
writer.ensure_end_range_consistency()?;
|
||||
|
||||
// Now set up changeset sidecar with post-heal header values
|
||||
if segment.is_change_based() {
|
||||
writer.heal_changeset_sidecar()?;
|
||||
}
|
||||
|
||||
Ok(writer)
|
||||
}
|
||||
|
||||
@@ -349,7 +367,164 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
|
||||
self.prune_on_commit.is_some()
|
||||
}
|
||||
|
||||
/// Syncs all data (rows and offsets) to disk.
|
||||
/// Heals the changeset offset sidecar after `NippyJar` healing.
|
||||
///
|
||||
/// This must be called AFTER `ensure_end_range_consistency()` which may reduce rows.
|
||||
/// Performs three-way consistency check between header, `NippyJar` rows, and sidecar file:
|
||||
/// - Validates sidecar offsets don't point past actual `NippyJar` rows
|
||||
/// - Heals header if sidecar was truncated during interrupted prune
|
||||
/// - Truncates sidecar if offsets point past healed `NippyJar` data
|
||||
fn heal_changeset_sidecar(&mut self) -> ProviderResult<()> {
|
||||
let csoff_path = self.data_path.with_extension("csoff");
|
||||
|
||||
// Step 1: Read all three sources of truth
|
||||
let header_claims_blocks = self.writer.user_header().changeset_offsets_len();
|
||||
let actual_nippy_rows = self.writer.rows() as u64;
|
||||
|
||||
// Get actual sidecar file size (may differ from header after crash)
|
||||
let actual_sidecar_blocks = if csoff_path.exists() {
|
||||
let file_len = reth_fs_util::metadata(&csoff_path).map_err(ProviderError::other)?.len();
|
||||
// Remove partial records from crash mid-write
|
||||
let aligned_len = file_len - (file_len % 16);
|
||||
aligned_len / 16
|
||||
} else {
|
||||
0
|
||||
};
|
||||
|
||||
// Fresh segment or no sidecar data - nothing to heal
|
||||
if header_claims_blocks == 0 && actual_sidecar_blocks == 0 {
|
||||
self.changeset_offsets =
|
||||
Some(ChangesetOffsetWriter::new(&csoff_path, 0).map_err(ProviderError::other)?);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Step 2: Validate sidecar offsets against actual NippyJar state
|
||||
let valid_blocks = if actual_sidecar_blocks > 0 {
|
||||
let mut reader = ChangesetOffsetReader::new(&csoff_path, actual_sidecar_blocks)
|
||||
.map_err(ProviderError::other)?;
|
||||
|
||||
// Find last block where offset + num_changes <= actual_nippy_rows
|
||||
// This correctly handles rows=0 with offset=0, num_changes=0 (empty blocks)
|
||||
let mut valid = 0u64;
|
||||
for i in 0..actual_sidecar_blocks {
|
||||
if let Some(offset) = reader.get(i).map_err(ProviderError::other)? {
|
||||
if offset.offset() + offset.num_changes() <= actual_nippy_rows {
|
||||
valid = i + 1;
|
||||
} else {
|
||||
// This block points past EOF - stop here
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
valid
|
||||
} else {
|
||||
0
|
||||
};
|
||||
|
||||
// Step 3: Determine correct state from synced files (source of truth)
|
||||
// Header is the commit marker - never enlarge, only shrink
|
||||
let correct_blocks = valid_blocks.min(header_claims_blocks);
|
||||
|
||||
// Step 4: Heal if header doesn't match validated truth
|
||||
let mut needs_header_commit = false;
|
||||
|
||||
if correct_blocks != header_claims_blocks || actual_sidecar_blocks != correct_blocks {
|
||||
tracing::warn!(
|
||||
target: "reth::static_file",
|
||||
path = %csoff_path.display(),
|
||||
header_claims = header_claims_blocks,
|
||||
sidecar_has = actual_sidecar_blocks,
|
||||
valid_blocks = correct_blocks,
|
||||
actual_rows = actual_nippy_rows,
|
||||
"Three-way healing: syncing header, sidecar, and NippyJar state"
|
||||
);
|
||||
|
||||
// Truncate sidecar file if it has invalid blocks
|
||||
if actual_sidecar_blocks > correct_blocks {
|
||||
use std::fs::OpenOptions;
|
||||
let file = OpenOptions::new()
|
||||
.write(true)
|
||||
.open(&csoff_path)
|
||||
.map_err(ProviderError::other)?;
|
||||
file.set_len(correct_blocks * 16).map_err(ProviderError::other)?;
|
||||
file.sync_all().map_err(ProviderError::other)?;
|
||||
|
||||
tracing::debug!(
|
||||
target: "reth::static_file",
|
||||
"Truncated sidecar from {} to {} blocks",
|
||||
actual_sidecar_blocks,
|
||||
correct_blocks
|
||||
);
|
||||
}
|
||||
|
||||
// Update header to match validated truth (can only shrink, never enlarge)
|
||||
if correct_blocks < header_claims_blocks {
|
||||
// Blocks were removed - use prune() to update both block_range and
|
||||
// changeset_offsets_len atomically
|
||||
let blocks_removed = header_claims_blocks - correct_blocks;
|
||||
self.writer.user_header_mut().prune(blocks_removed);
|
||||
|
||||
tracing::debug!(
|
||||
target: "reth::static_file",
|
||||
"Updated header: removed {} blocks (changeset_offsets_len: {} -> {})",
|
||||
blocks_removed,
|
||||
header_claims_blocks,
|
||||
correct_blocks
|
||||
);
|
||||
|
||||
needs_header_commit = true;
|
||||
}
|
||||
} else {
|
||||
tracing::debug!(
|
||||
target: "reth::static_file",
|
||||
path = %csoff_path.display(),
|
||||
blocks = correct_blocks,
|
||||
"Changeset sidecar consistent, no healing needed"
|
||||
);
|
||||
}
|
||||
|
||||
// Open sidecar writer with corrected count (won't error now that sizes match)
|
||||
let csoff_writer = ChangesetOffsetWriter::new(&csoff_path, correct_blocks)
|
||||
.map_err(ProviderError::other)?;
|
||||
|
||||
self.changeset_offsets = Some(csoff_writer);
|
||||
|
||||
// Commit healed header if needed (after sidecar writer is set up)
|
||||
if needs_header_commit {
|
||||
self.writer.commit().map_err(ProviderError::other)?;
|
||||
|
||||
tracing::info!(
|
||||
target: "reth::static_file",
|
||||
path = %csoff_path.display(),
|
||||
blocks = correct_blocks,
|
||||
"Committed healed changeset offset header"
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Flushes the current changeset offset (if any) to the `.csoff` sidecar file.
|
||||
///
|
||||
/// This is idempotent - safe to call multiple times. After flushing, the current offset
|
||||
/// is cleared to prevent duplicate writes.
|
||||
///
|
||||
/// This must be called before committing or syncing to ensure the last block's offset
|
||||
/// is persisted, since `increment_block()` only writes the *previous* block's offset.
|
||||
fn flush_current_changeset_offset(&mut self) -> ProviderResult<()> {
|
||||
if !self.writer.user_header().segment().is_change_based() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if let Some(offset) = self.current_changeset_offset.take() &&
|
||||
let Some(writer) = &mut self.changeset_offsets
|
||||
{
|
||||
writer.append(&offset).map_err(ProviderError::other)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Syncs all data (rows, offsets, and changeset offsets sidecar) to disk.
|
||||
///
|
||||
/// This does NOT commit the configuration. Call [`Self::finalize`] after to write the
|
||||
/// configuration and mark the writer as clean.
|
||||
@@ -359,6 +534,15 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
|
||||
if self.prune_on_commit.is_some() {
|
||||
return Err(StaticFileWriterError::FinalizeWithPruneQueued.into());
|
||||
}
|
||||
|
||||
// Write the final block's offset and sync the sidecar for changeset segments
|
||||
self.flush_current_changeset_offset()?;
|
||||
if let Some(writer) = &mut self.changeset_offsets {
|
||||
writer.sync().map_err(ProviderError::other)?;
|
||||
// Update the header with the actual number of offsets written
|
||||
self.writer.user_header_mut().set_changeset_offsets_len(writer.len());
|
||||
}
|
||||
|
||||
if self.writer.is_dirty() {
|
||||
self.writer.sync_all().map_err(ProviderError::other)?;
|
||||
}
|
||||
@@ -383,7 +567,9 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
|
||||
}
|
||||
if self.writer.is_dirty() {
|
||||
if !self.synced {
|
||||
self.writer.sync_all().map_err(ProviderError::other)?;
|
||||
// Must call self.sync_all() to flush changeset offsets and update
|
||||
// the header's changeset_offsets_len, not just the inner writer
|
||||
self.sync_all()?;
|
||||
}
|
||||
|
||||
self.writer.finalize().map_err(ProviderError::other)?;
|
||||
@@ -430,6 +616,15 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
|
||||
}
|
||||
}
|
||||
|
||||
// For changeset segments, flush and sync the sidecar file before committing the main file.
|
||||
// This ensures crash consistency: the sidecar is durable before the header references it.
|
||||
self.flush_current_changeset_offset()?;
|
||||
if let Some(writer) = &mut self.changeset_offsets {
|
||||
writer.sync().map_err(ProviderError::other)?;
|
||||
// Update the header with the actual number of offsets written
|
||||
self.writer.user_header_mut().set_changeset_offsets_len(writer.len());
|
||||
}
|
||||
|
||||
if self.writer.is_dirty() {
|
||||
debug!(
|
||||
target: "providers::static_file",
|
||||
@@ -573,7 +768,15 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
|
||||
let (writer, data_path) =
|
||||
Self::open(segment, last_block + 1, self.reader.clone(), self.metrics.clone())?;
|
||||
self.writer = writer;
|
||||
self.data_path = data_path;
|
||||
self.data_path = data_path.clone();
|
||||
|
||||
// Update changeset offsets writer for the new file (starts empty)
|
||||
if segment.is_change_based() {
|
||||
let csoff_path = data_path.with_extension("csoff");
|
||||
self.changeset_offsets = Some(
|
||||
ChangesetOffsetWriter::new(&csoff_path, 0).map_err(ProviderError::other)?,
|
||||
);
|
||||
}
|
||||
|
||||
*self.writer.user_header_mut() = SegmentHeader::new(
|
||||
self.reader().find_fixed_range(segment, last_block + 1),
|
||||
@@ -585,6 +788,20 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
|
||||
}
|
||||
|
||||
self.writer.user_header_mut().increment_block();
|
||||
|
||||
// Handle changeset offset tracking for changeset segments
|
||||
if segment.is_change_based() {
|
||||
// Write previous block's offset if we have one
|
||||
if let Some(offset) = self.current_changeset_offset.take() &&
|
||||
let Some(writer) = &mut self.changeset_offsets
|
||||
{
|
||||
writer.append(&offset).map_err(ProviderError::other)?;
|
||||
}
|
||||
// Start tracking new block's offset
|
||||
let new_offset = self.writer.rows() as u64;
|
||||
self.current_changeset_offset = Some(ChangesetOffset::new(new_offset, 0));
|
||||
}
|
||||
|
||||
if let Some(metrics) = &self.metrics {
|
||||
metrics.record_segment_operation(
|
||||
segment,
|
||||
@@ -656,13 +873,6 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
|
||||
expected_block_start = self.writer.user_header().expected_block_start();
|
||||
}
|
||||
|
||||
// Now we're in the correct file, we need to find how many rows to prune
|
||||
// We need to iterate through the changesets to find the correct position
|
||||
// Since changesets are stored per block, we need to find the offset for the block
|
||||
let changeset_offsets = self.writer.user_header().changeset_offsets().ok_or_else(|| {
|
||||
ProviderError::other(StaticFileWriterError::new("Missing changeset offsets"))
|
||||
})?;
|
||||
|
||||
// Find the number of rows to keep (up to and including last_block)
|
||||
let blocks_to_keep = if last_block >= expected_block_start {
|
||||
last_block - expected_block_start + 1
|
||||
@@ -670,18 +880,30 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
|
||||
0
|
||||
};
|
||||
|
||||
// Read changeset offsets from sidecar file to find where to truncate
|
||||
let csoff_path = self.data_path.with_extension("csoff");
|
||||
let changeset_offsets_len = self.writer.user_header().changeset_offsets_len();
|
||||
|
||||
// Flush any pending changeset offset before reading the sidecar
|
||||
self.flush_current_changeset_offset()?;
|
||||
|
||||
let rows_to_keep = if blocks_to_keep == 0 {
|
||||
0
|
||||
} else if blocks_to_keep as usize > changeset_offsets.len() {
|
||||
// Keep all rows in this file (shouldn't happen if data is consistent)
|
||||
self.writer.rows() as u64
|
||||
} else if blocks_to_keep as usize == changeset_offsets.len() {
|
||||
// Keep all rows
|
||||
} else if blocks_to_keep >= changeset_offsets_len {
|
||||
// Keep all rows in this file
|
||||
self.writer.rows() as u64
|
||||
} else {
|
||||
// Find the offset for the block after last_block
|
||||
// This gives us the number of rows to keep
|
||||
changeset_offsets[blocks_to_keep as usize].offset()
|
||||
// Read offset for the block after last_block from sidecar.
|
||||
// Use committed length from header, ignoring any uncommitted records
|
||||
// that may exist in the file after a crash.
|
||||
let mut reader = ChangesetOffsetReader::new(&csoff_path, changeset_offsets_len)
|
||||
.map_err(ProviderError::other)?;
|
||||
if let Some(next_offset) = reader.get(blocks_to_keep).map_err(ProviderError::other)? {
|
||||
next_offset.offset()
|
||||
} else {
|
||||
// If we can't read the offset, keep all rows
|
||||
self.writer.rows() as u64
|
||||
}
|
||||
};
|
||||
|
||||
let total_rows = self.writer.rows() as u64;
|
||||
@@ -709,6 +931,14 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
|
||||
// Sync changeset offsets to match the new block range
|
||||
self.writer.user_header_mut().sync_changeset_offsets();
|
||||
|
||||
// Truncate the sidecar file to match the new block count
|
||||
if let Some(writer) = &mut self.changeset_offsets {
|
||||
writer.truncate(blocks_to_keep).map_err(ProviderError::other)?;
|
||||
}
|
||||
|
||||
// Clear current changeset offset tracking since we've pruned
|
||||
self.current_changeset_offset = None;
|
||||
|
||||
// Commits new changes to disk
|
||||
self.commit()?;
|
||||
|
||||
@@ -789,16 +1019,36 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
|
||||
/// Delete the current static file, and replace this provider writer with the previous static
|
||||
/// file.
|
||||
fn delete_current_and_open_previous(&mut self) -> Result<(), ProviderError> {
|
||||
let segment = self.user_header().segment();
|
||||
let current_path = self.data_path.clone();
|
||||
let (previous_writer, data_path) = Self::open(
|
||||
self.user_header().segment(),
|
||||
segment,
|
||||
self.writer.user_header().expected_block_start() - 1,
|
||||
self.reader.clone(),
|
||||
self.metrics.clone(),
|
||||
)?;
|
||||
self.writer = previous_writer;
|
||||
self.writer.set_dirty();
|
||||
self.data_path = data_path;
|
||||
self.data_path = data_path.clone();
|
||||
|
||||
// Delete the sidecar file for changeset segments before deleting the main jar
|
||||
if segment.is_change_based() {
|
||||
let csoff_path = current_path.with_extension("csoff");
|
||||
if csoff_path.exists() {
|
||||
std::fs::remove_file(&csoff_path).map_err(ProviderError::other)?;
|
||||
}
|
||||
// Re-initialize the changeset offsets writer for the previous file
|
||||
let new_csoff_path = data_path.with_extension("csoff");
|
||||
let committed_len = self.writer.user_header().changeset_offsets_len();
|
||||
self.changeset_offsets = Some(
|
||||
ChangesetOffsetWriter::new(&new_csoff_path, committed_len)
|
||||
.map_err(ProviderError::other)?,
|
||||
);
|
||||
}
|
||||
|
||||
// Clear current changeset offset tracking since we're switching files
|
||||
self.current_changeset_offset = None;
|
||||
|
||||
NippyJar::<SegmentHeader>::load(¤t_path)
|
||||
.map_err(ProviderError::other)?
|
||||
.delete()
|
||||
@@ -842,10 +1092,9 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
|
||||
|
||||
/// Appends change to changeset static file.
|
||||
fn append_change<V: Compact>(&mut self, change: &V) -> ProviderResult<()> {
|
||||
if self.writer.user_header().changeset_offsets().is_some() {
|
||||
self.writer.user_header_mut().increment_block_changes();
|
||||
if let Some(ref mut offset) = self.current_changeset_offset {
|
||||
offset.increment_num_changes();
|
||||
}
|
||||
|
||||
self.append_column(change)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -0,0 +1,812 @@
|
||||
//! Crash recovery tests for changeset offset healing.
|
||||
//!
|
||||
//! These tests verify the three-way consistency healing logic between:
|
||||
//! 1. Header (`SegmentHeader.changeset_offsets_len`)
|
||||
//! 2. `NippyJar` rows (actual row count)
|
||||
//! 3. Sidecar file (.csoff)
|
||||
//!
|
||||
//! All tests use real `NippyJar` files via `StaticFileProvider` to test the full healing path.
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use alloy_primitives::{Address, U256};
|
||||
use reth_db::{models::AccountBeforeTx, test_utils::create_test_static_files_dir};
|
||||
use reth_primitives_traits::Account;
|
||||
use reth_static_file_types::{ChangesetOffset, ChangesetOffsetReader, StaticFileSegment};
|
||||
use std::{fs::OpenOptions, io::Write as _, path::PathBuf};
|
||||
|
||||
use crate::providers::{
|
||||
static_file::manager::{StaticFileProviderBuilder, StaticFileWriter},
|
||||
StaticFileProvider,
|
||||
};
|
||||
use reth_chain_state::EthPrimitives;
|
||||
|
||||
// ==================== HELPER FUNCTIONS ====================
|
||||
|
||||
/// Creates a `StaticFileProvider` for testing with the given `blocks_per_file` setting.
|
||||
fn setup_test_provider(
|
||||
static_dir: &tempfile::TempDir,
|
||||
blocks_per_file: u64,
|
||||
) -> StaticFileProvider<EthPrimitives> {
|
||||
StaticFileProviderBuilder::read_write(static_dir)
|
||||
.with_blocks_per_file(blocks_per_file)
|
||||
.build()
|
||||
.expect("Failed to build static file provider")
|
||||
}
|
||||
|
||||
/// Generates test changeset data for a block.
|
||||
fn generate_test_changeset(block_num: u64, num_changes: usize) -> Vec<AccountBeforeTx> {
|
||||
(0..num_changes)
|
||||
.map(|i| {
|
||||
let mut address = Address::ZERO;
|
||||
address.0[0] = block_num as u8;
|
||||
address.0[1] = i as u8;
|
||||
AccountBeforeTx {
|
||||
address,
|
||||
info: Some(Account {
|
||||
nonce: block_num,
|
||||
balance: U256::from(block_num * 1000 + i as u64),
|
||||
bytecode_hash: None,
|
||||
}),
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Writes test blocks to the `AccountChangeSets` segment.
|
||||
/// Returns the path to the sidecar file.
|
||||
fn write_test_blocks(
|
||||
provider: &StaticFileProvider<EthPrimitives>,
|
||||
num_blocks: u64,
|
||||
changes_per_block: usize,
|
||||
) -> PathBuf {
|
||||
let mut writer = provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
|
||||
|
||||
for block_num in 0..num_blocks {
|
||||
let changeset = generate_test_changeset(block_num, changes_per_block);
|
||||
writer.append_account_changeset(changeset, block_num).unwrap();
|
||||
}
|
||||
|
||||
writer.commit().unwrap();
|
||||
|
||||
// Return path to sidecar
|
||||
get_sidecar_path(provider, 0)
|
||||
}
|
||||
|
||||
/// Gets the .csoff sidecar path for a given block.
|
||||
fn get_sidecar_path(provider: &StaticFileProvider<EthPrimitives>, block: u64) -> PathBuf {
|
||||
let range = provider.find_fixed_range(StaticFileSegment::AccountChangeSets, block);
|
||||
let filename = StaticFileSegment::AccountChangeSets.filename(&range);
|
||||
provider.directory().join(filename).with_extension("csoff")
|
||||
}
|
||||
|
||||
/// Reads the block count from a sidecar file (file size / 16).
|
||||
fn get_sidecar_block_count(path: &PathBuf) -> u64 {
|
||||
if !path.exists() {
|
||||
return 0;
|
||||
}
|
||||
let metadata = std::fs::metadata(path).unwrap();
|
||||
metadata.len() / 16
|
||||
}
|
||||
|
||||
/// Appends a partial record to sidecar (simulates crash mid-write).
|
||||
fn corrupt_sidecar_partial_write(path: &PathBuf, partial_bytes: usize) {
|
||||
let mut file = OpenOptions::new().append(true).open(path).unwrap();
|
||||
file.write_all(&vec![0u8; partial_bytes]).unwrap();
|
||||
file.sync_all().unwrap();
|
||||
}
|
||||
|
||||
/// Appends fake blocks to sidecar that point past actual `NippyJar` rows.
|
||||
fn corrupt_sidecar_add_fake_blocks(path: &PathBuf, num_fake_blocks: u64, start_offset: u64) {
|
||||
let mut file = OpenOptions::new().append(true).open(path).unwrap();
|
||||
for i in 0..num_fake_blocks {
|
||||
let offset = ChangesetOffset::new(start_offset + i * 5, 5);
|
||||
let mut buf = [0u8; 16];
|
||||
buf[..8].copy_from_slice(&offset.offset().to_le_bytes());
|
||||
buf[8..].copy_from_slice(&offset.num_changes().to_le_bytes());
|
||||
file.write_all(&buf).unwrap();
|
||||
}
|
||||
file.sync_all().unwrap();
|
||||
}
|
||||
|
||||
/// Truncates the sidecar file to a specific number of blocks.
|
||||
fn truncate_sidecar(path: &PathBuf, num_blocks: u64) {
|
||||
let file = OpenOptions::new().write(true).open(path).unwrap();
|
||||
file.set_len(num_blocks * 16).unwrap();
|
||||
file.sync_all().unwrap();
|
||||
}
|
||||
|
||||
/// Reads the `changeset_offsets_len` from the segment header.
|
||||
fn get_header_block_count(provider: &StaticFileProvider<EthPrimitives>, block: u64) -> u64 {
|
||||
let jar_provider = provider
|
||||
.get_segment_provider_for_block(StaticFileSegment::AccountChangeSets, block, None)
|
||||
.unwrap();
|
||||
jar_provider.user_header().changeset_offsets_len()
|
||||
}
|
||||
|
||||
/// Gets the actual row count from `NippyJar`.
|
||||
fn get_nippy_row_count(provider: &StaticFileProvider<EthPrimitives>, block: u64) -> u64 {
|
||||
let jar_provider = provider
|
||||
.get_segment_provider_for_block(StaticFileSegment::AccountChangeSets, block, None)
|
||||
.unwrap();
|
||||
jar_provider.rows() as u64
|
||||
}
|
||||
|
||||
// ==================== APPEND CRASH SCENARIOS ====================
|
||||
|
||||
#[test]
|
||||
fn test_append_crash_partial_sidecar_record() {
|
||||
// SCENARIO: Crash mid-write of a 16-byte sidecar record.
|
||||
// State after crash: Sidecar has N complete records + partial bytes.
|
||||
// Expected: Healing truncates partial bytes, keeps N complete records.
|
||||
|
||||
let (static_dir, _) = create_test_static_files_dir();
|
||||
let provider = setup_test_provider(&static_dir, 100);
|
||||
|
||||
// Write 5 blocks with 3 changes each
|
||||
let sidecar_path = write_test_blocks(&provider, 5, 3);
|
||||
assert_eq!(get_sidecar_block_count(&sidecar_path), 5);
|
||||
|
||||
// Corrupt: append partial record (8 of 16 bytes)
|
||||
corrupt_sidecar_partial_write(&sidecar_path, 8);
|
||||
assert_eq!(
|
||||
std::fs::metadata(&sidecar_path).unwrap().len(),
|
||||
5 * 16 + 8,
|
||||
"Should have 5 records + 8 partial bytes"
|
||||
);
|
||||
|
||||
// Reopen provider - triggers healing
|
||||
drop(provider);
|
||||
let provider = setup_test_provider(&static_dir, 100);
|
||||
|
||||
// Verify healing truncated partial record
|
||||
let _writer = provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
|
||||
assert_eq!(get_sidecar_block_count(&sidecar_path), 5, "Should have 5 complete records");
|
||||
assert_eq!(
|
||||
std::fs::metadata(&sidecar_path).unwrap().len(),
|
||||
5 * 16,
|
||||
"File should be exactly 80 bytes"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_append_crash_sidecar_synced_header_not_committed() {
|
||||
// SCENARIO: Sidecar was synced with new blocks, but header commit crashed.
|
||||
// State after crash: Sidecar has more blocks than header claims.
|
||||
// Expected: Healing clamps to header value (never enlarges header).
|
||||
|
||||
let (static_dir, _) = create_test_static_files_dir();
|
||||
let provider = setup_test_provider(&static_dir, 100);
|
||||
|
||||
// Write 5 blocks
|
||||
let sidecar_path = write_test_blocks(&provider, 5, 3);
|
||||
let total_rows = 5 * 3; // 15 rows
|
||||
|
||||
// Corrupt: add 3 fake blocks to sidecar (simulates sidecar sync before header commit)
|
||||
// These blocks point to valid rows (within the 15 rows we have)
|
||||
// But header doesn't know about them
|
||||
corrupt_sidecar_add_fake_blocks(&sidecar_path, 3, total_rows as u64);
|
||||
assert_eq!(get_sidecar_block_count(&sidecar_path), 8, "Sidecar should have 8 blocks");
|
||||
|
||||
// Reopen - healing should clamp to header's 5 blocks
|
||||
drop(provider);
|
||||
let provider = setup_test_provider(&static_dir, 100);
|
||||
let _writer = provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
|
||||
|
||||
// Header is authoritative - sidecar should be truncated to 5
|
||||
assert_eq!(get_sidecar_block_count(&sidecar_path), 5, "Should clamp to header value");
|
||||
assert_eq!(get_header_block_count(&provider, 0), 5);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_append_crash_sidecar_ahead_of_nippy_offsets() {
|
||||
// APPEND CP2: After data sync, before offsets sync.
|
||||
// SCENARIO: Sidecar was synced first (has new blocks), data file has new rows,
|
||||
// but NippyJar offsets file is stale. Config is stale.
|
||||
//
|
||||
// We can't easily simulate NippyJar internal offset mismatch, but we CAN test
|
||||
// that sidecar entries are validated against actual_nippy_rows (from NippyJar.rows()).
|
||||
// If sidecar claims blocks that would exceed NippyJar's row count, healing truncates.
|
||||
|
||||
let (static_dir, _) = create_test_static_files_dir();
|
||||
let provider = setup_test_provider(&static_dir, 100);
|
||||
|
||||
// Write 5 blocks with 3 changes each (15 rows total)
|
||||
let sidecar_path = write_test_blocks(&provider, 5, 3);
|
||||
assert_eq!(get_nippy_row_count(&provider, 0), 15);
|
||||
assert_eq!(get_sidecar_block_count(&sidecar_path), 5);
|
||||
|
||||
// Corrupt sidecar: add 3 fake blocks that claim to point to rows beyond NippyJar's 15
|
||||
// Block 6: offset=15, count=3 (rows 15-17, but only 15 rows exist!)
|
||||
// Block 7: offset=18, count=3 (rows 18-20, invalid)
|
||||
// Block 8: offset=21, count=3 (rows 21-23, invalid)
|
||||
corrupt_sidecar_add_fake_blocks(&sidecar_path, 3, 15);
|
||||
assert_eq!(get_sidecar_block_count(&sidecar_path), 8);
|
||||
|
||||
// Reopen - healing should detect sidecar offsets point past actual NippyJar rows
|
||||
// and truncate back. Since header is 5, healing clamps to min(8, 5) = 5.
|
||||
drop(provider);
|
||||
let provider = setup_test_provider(&static_dir, 100);
|
||||
let _writer = provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
|
||||
|
||||
// Sidecar should be clamped to header's 5 blocks
|
||||
assert_eq!(get_sidecar_block_count(&sidecar_path), 5);
|
||||
assert_eq!(get_header_block_count(&provider, 0), 5);
|
||||
// NippyJar rows unchanged
|
||||
assert_eq!(get_nippy_row_count(&provider, 0), 15);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_append_clean_no_crash() {
|
||||
// BASELINE: Normal append with no crash.
|
||||
// All three sources should be in sync.
|
||||
|
||||
let (static_dir, _) = create_test_static_files_dir();
|
||||
let provider = setup_test_provider(&static_dir, 100);
|
||||
|
||||
// Write 10 blocks with 5 changes each
|
||||
let sidecar_path = write_test_blocks(&provider, 10, 5);
|
||||
|
||||
// Verify all in sync
|
||||
assert_eq!(get_sidecar_block_count(&sidecar_path), 10);
|
||||
assert_eq!(get_header_block_count(&provider, 0), 10);
|
||||
assert_eq!(get_nippy_row_count(&provider, 0), 50); // 10 blocks * 5 changes
|
||||
|
||||
// Reopen multiple times - should remain stable
|
||||
drop(provider);
|
||||
for _ in 0..3 {
|
||||
let provider = setup_test_provider(&static_dir, 100);
|
||||
let _writer = provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
|
||||
assert_eq!(get_sidecar_block_count(&sidecar_path), 10);
|
||||
}
|
||||
}
|
||||
|
||||
// ==================== PRUNE CRASH SCENARIOS ====================
|
||||
|
||||
#[test]
|
||||
fn test_prune_crash_sidecar_truncated_header_stale() {
|
||||
// SCENARIO: Prune truncated sidecar but crashed before header commit.
|
||||
// State after crash: Sidecar has fewer blocks than header claims.
|
||||
// Expected: Healing updates header to match sidecar.
|
||||
|
||||
let (static_dir, _) = create_test_static_files_dir();
|
||||
let provider = setup_test_provider(&static_dir, 100);
|
||||
|
||||
// Write 10 blocks
|
||||
let sidecar_path = write_test_blocks(&provider, 10, 5);
|
||||
assert_eq!(get_sidecar_block_count(&sidecar_path), 10);
|
||||
|
||||
// Simulate prune crash: truncate sidecar to 7 blocks but don't update header
|
||||
// (In real crash, header would still claim 10 blocks)
|
||||
truncate_sidecar(&sidecar_path, 7);
|
||||
assert_eq!(get_sidecar_block_count(&sidecar_path), 7);
|
||||
|
||||
// Reopen - healing should detect sidecar < header and fix header
|
||||
drop(provider);
|
||||
let provider = setup_test_provider(&static_dir, 100);
|
||||
{
|
||||
let _writer = provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
|
||||
// Writer commits healed header on open
|
||||
}
|
||||
// Drop writer and reopen provider to see committed changes
|
||||
drop(provider);
|
||||
let provider = setup_test_provider(&static_dir, 100);
|
||||
|
||||
// After healing, header should match sidecar
|
||||
assert_eq!(get_sidecar_block_count(&sidecar_path), 7);
|
||||
assert_eq!(get_header_block_count(&provider, 0), 7);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prune_crash_sidecar_offsets_past_nippy_rows() {
|
||||
// SCENARIO: NippyJar was pruned but sidecar wasn't truncated.
|
||||
// State after crash: Sidecar has offsets pointing past actual NippyJar rows.
|
||||
// Expected: Healing validates offsets and truncates invalid blocks.
|
||||
|
||||
let (static_dir, _) = create_test_static_files_dir();
|
||||
let provider = setup_test_provider(&static_dir, 100);
|
||||
|
||||
// Write 10 blocks with 5 changes each (50 rows total)
|
||||
let sidecar_path = write_test_blocks(&provider, 10, 5);
|
||||
|
||||
// Verify initial state
|
||||
assert_eq!(get_nippy_row_count(&provider, 0), 50);
|
||||
assert_eq!(get_sidecar_block_count(&sidecar_path), 10);
|
||||
|
||||
// Now add fake blocks to sidecar that point past row 50
|
||||
// These simulate a crash where sidecar wasn't cleaned up after NippyJar prune
|
||||
corrupt_sidecar_add_fake_blocks(&sidecar_path, 5, 50); // Blocks pointing to rows 50-74
|
||||
assert_eq!(get_sidecar_block_count(&sidecar_path), 15);
|
||||
|
||||
// Reopen - healing should detect invalid offsets and truncate
|
||||
drop(provider);
|
||||
let provider = setup_test_provider(&static_dir, 100);
|
||||
let _writer = provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
|
||||
|
||||
// Should be back to 10 valid blocks (offsets pointing within 50 rows)
|
||||
assert_eq!(get_sidecar_block_count(&sidecar_path), 10);
|
||||
assert_eq!(get_header_block_count(&provider, 0), 10);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prune_crash_nippy_offsets_truncated_data_stale() {
|
||||
// PRUNE CP1-3: Tests healing when sidecar has offsets past NippyJar.rows().
|
||||
// NippyJar heals internally, then changeset healing validates against rows().
|
||||
// Verifies healing uses actual_nippy_rows as source of truth.
|
||||
|
||||
let (static_dir, _) = create_test_static_files_dir();
|
||||
let provider = setup_test_provider(&static_dir, 100);
|
||||
|
||||
// Write 10 blocks with varying changes
|
||||
{
|
||||
let mut writer = provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
|
||||
for block_num in 0..10 {
|
||||
let changes = (block_num % 5 + 1) as usize; // 1-5 changes per block
|
||||
writer
|
||||
.append_account_changeset(
|
||||
generate_test_changeset(block_num, changes),
|
||||
block_num,
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
writer.commit().unwrap();
|
||||
}
|
||||
|
||||
let sidecar_path = get_sidecar_path(&provider, 0);
|
||||
let actual_rows = get_nippy_row_count(&provider, 0);
|
||||
assert_eq!(get_sidecar_block_count(&sidecar_path), 10);
|
||||
|
||||
// Simulate a scenario where sidecar has blocks pointing past actual rows.
|
||||
// Add 2 fake blocks that would exceed the row count.
|
||||
// Block 11: offset=actual_rows, count=5 (pointing past EOF)
|
||||
corrupt_sidecar_add_fake_blocks(&sidecar_path, 2, actual_rows);
|
||||
assert_eq!(get_sidecar_block_count(&sidecar_path), 12);
|
||||
|
||||
// Reopen - healing validates all offsets against NippyJar.rows()
|
||||
drop(provider);
|
||||
let provider = setup_test_provider(&static_dir, 100);
|
||||
let _writer = provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
|
||||
|
||||
// After healing: sidecar clamped to header (10), rows unchanged
|
||||
assert_eq!(get_sidecar_block_count(&sidecar_path), 10, "Sidecar clamped to header");
|
||||
assert_eq!(get_header_block_count(&provider, 0), 10);
|
||||
assert_eq!(get_nippy_row_count(&provider, 0), actual_rows, "NippyJar rows unchanged");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prune_crash_sidecar_truncate_not_synced() {
|
||||
// PRUNE CP6: Sidecar truncated but not synced (power loss could resurrect old length).
|
||||
// SCENARIO: We pruned from 10 to 7 blocks. Header was updated to 7.
|
||||
// Sidecar was truncated to 7 but fsync didn't complete before power loss.
|
||||
// After restart, filesystem resurrects sidecar back to 10 (old length).
|
||||
//
|
||||
// State after crash: Header says 7, sidecar shows 10.
|
||||
// Expected: Healing clamps sidecar to header's 7 (header is authoritative).
|
||||
|
||||
let (static_dir, _) = create_test_static_files_dir();
|
||||
let provider = setup_test_provider(&static_dir, 100);
|
||||
|
||||
// Write 10 blocks with 5 changes each
|
||||
let sidecar_path = write_test_blocks(&provider, 10, 5);
|
||||
assert_eq!(get_sidecar_block_count(&sidecar_path), 10);
|
||||
assert_eq!(get_header_block_count(&provider, 0), 10);
|
||||
|
||||
// Prune to 7 blocks (keep blocks 0-6)
|
||||
{
|
||||
let mut writer = provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
|
||||
writer.prune_account_changesets(6).unwrap();
|
||||
writer.commit().unwrap();
|
||||
}
|
||||
|
||||
// Verify prune worked
|
||||
drop(provider);
|
||||
let provider = setup_test_provider(&static_dir, 100);
|
||||
assert_eq!(get_header_block_count(&provider, 0), 7);
|
||||
assert_eq!(get_sidecar_block_count(&sidecar_path), 7);
|
||||
|
||||
// Now write 3 more blocks (back to 10 total)
|
||||
{
|
||||
let mut writer = provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
|
||||
for block_num in 7..10 {
|
||||
let changeset = generate_test_changeset(block_num, 5);
|
||||
writer.append_account_changeset(changeset, block_num).unwrap();
|
||||
}
|
||||
writer.commit().unwrap();
|
||||
}
|
||||
drop(provider);
|
||||
let provider = setup_test_provider(&static_dir, 100);
|
||||
assert_eq!(get_sidecar_block_count(&sidecar_path), 10);
|
||||
assert_eq!(get_header_block_count(&provider, 0), 10);
|
||||
|
||||
// Prune back to 7 again
|
||||
{
|
||||
let mut writer = provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
|
||||
writer.prune_account_changesets(6).unwrap();
|
||||
writer.commit().unwrap();
|
||||
}
|
||||
drop(provider);
|
||||
|
||||
// SIMULATE POWER LOSS: Restore sidecar to 10 blocks (as if truncate wasn't synced)
|
||||
// Extend sidecar back to 10 blocks to simulate "resurrected" state
|
||||
corrupt_sidecar_add_fake_blocks(&sidecar_path, 3, 35);
|
||||
assert_eq!(get_sidecar_block_count(&sidecar_path), 10, "Simulated resurrected sidecar");
|
||||
|
||||
// Reopen - healing should detect sidecar (10) > header (7) and clamp
|
||||
let provider = setup_test_provider(&static_dir, 100);
|
||||
let _writer = provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
|
||||
|
||||
// Header is authoritative - sidecar must be clamped to 7
|
||||
assert_eq!(get_sidecar_block_count(&sidecar_path), 7, "Sidecar clamped to header");
|
||||
assert_eq!(get_header_block_count(&provider, 0), 7, "Header unchanged");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prune_with_unflushed_current_offset() {
|
||||
// REGRESSION (Issue #7): truncate_changesets() didn't call flush_current_changeset_offset()
|
||||
// before reading the sidecar. This caused incorrect truncation when there was an unflushed
|
||||
// current_changeset_offset from recent appends.
|
||||
//
|
||||
// Test scenario: Prune immediately after appending without committing first.
|
||||
|
||||
let (static_dir, _) = create_test_static_files_dir();
|
||||
let provider = setup_test_provider(&static_dir, 100);
|
||||
|
||||
let sidecar_path = get_sidecar_path(&provider, 0);
|
||||
|
||||
{
|
||||
let mut writer = provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
|
||||
|
||||
// Step 1: Write 5 blocks (0-4), commit
|
||||
for block_num in 0..5u64 {
|
||||
let changeset = generate_test_changeset(block_num, 3);
|
||||
writer.append_account_changeset(changeset, block_num).unwrap();
|
||||
}
|
||||
writer.commit().unwrap();
|
||||
|
||||
// Step 2: Append 5 more blocks (5-9) WITHOUT committing
|
||||
for block_num in 5..10u64 {
|
||||
let changeset = generate_test_changeset(block_num, 3);
|
||||
writer.append_account_changeset(changeset, block_num).unwrap();
|
||||
}
|
||||
// Note: NOT committing here - block 9's offset is only in current_changeset_offset
|
||||
|
||||
// Step 3: Prune to block 7 (should keep blocks 0-7, including uncommitted 5-7)
|
||||
// This tests that the unflushed current_changeset_offset (for block 9) is properly
|
||||
// flushed before reading the sidecar during truncation.
|
||||
writer.prune_account_changesets(7).unwrap();
|
||||
|
||||
// Step 4: Commit
|
||||
writer.commit().unwrap();
|
||||
}
|
||||
|
||||
// Step 5: Verify sidecar has 8 blocks, header has 8 blocks, rows are correct
|
||||
assert_eq!(get_sidecar_block_count(&sidecar_path), 8, "Sidecar should have 8 blocks (0-7)");
|
||||
assert_eq!(get_header_block_count(&provider, 0), 8, "Header should have 8 blocks");
|
||||
assert_eq!(
|
||||
get_nippy_row_count(&provider, 0),
|
||||
24,
|
||||
"Should have 8 blocks * 3 changes = 24 rows"
|
||||
);
|
||||
|
||||
// Verify stability after reopen
|
||||
drop(provider);
|
||||
let provider = setup_test_provider(&static_dir, 100);
|
||||
let _writer = provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
|
||||
|
||||
assert_eq!(get_sidecar_block_count(&sidecar_path), 8, "Sidecar stable after reopen");
|
||||
assert_eq!(get_header_block_count(&provider, 0), 8, "Header stable after reopen");
|
||||
assert_eq!(get_nippy_row_count(&provider, 0), 24, "Rows stable after reopen");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prune_clean_no_crash() {
|
||||
// BASELINE: Normal prune with no crash.
|
||||
|
||||
let (static_dir, _) = create_test_static_files_dir();
|
||||
let provider = setup_test_provider(&static_dir, 100);
|
||||
|
||||
// Write 10 blocks
|
||||
write_test_blocks(&provider, 10, 5);
|
||||
|
||||
// Prune to keep blocks 0-6 (7 blocks total, removing blocks 7-9)
|
||||
// prune_account_changesets takes last_block to keep, not count to remove
|
||||
let sidecar_path = get_sidecar_path(&provider, 0);
|
||||
{
|
||||
let mut writer = provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
|
||||
writer.prune_account_changesets(6).unwrap(); // Keep blocks 0-6
|
||||
writer.commit().unwrap();
|
||||
}
|
||||
|
||||
// Reopen provider to see committed changes
|
||||
drop(provider);
|
||||
let provider = setup_test_provider(&static_dir, 100);
|
||||
|
||||
// Verify all in sync at 7 blocks (0-6)
|
||||
assert_eq!(get_sidecar_block_count(&sidecar_path), 7);
|
||||
assert_eq!(get_header_block_count(&provider, 0), 7);
|
||||
|
||||
// Rows should also be reduced (7 blocks * 5 changes = 35 rows)
|
||||
assert_eq!(get_nippy_row_count(&provider, 0), 35);
|
||||
}
|
||||
|
||||
// ==================== VALIDATION EDGE CASES ====================
|
||||
|
||||
#[test]
|
||||
fn test_empty_segment_fresh_start() {
|
||||
// SCENARIO: Brand new segment with no data.
|
||||
|
||||
let (static_dir, _) = create_test_static_files_dir();
|
||||
let provider = setup_test_provider(&static_dir, 100);
|
||||
|
||||
// Just open a writer without writing anything
|
||||
{
|
||||
let _writer = provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
|
||||
}
|
||||
|
||||
// Sidecar might not exist or be empty
|
||||
let sidecar_path = get_sidecar_path(&provider, 0);
|
||||
let block_count = get_sidecar_block_count(&sidecar_path);
|
||||
assert_eq!(block_count, 0, "Fresh segment should have 0 blocks");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_all_empty_blocks_preserved_on_reopen() {
|
||||
// REGRESSION: When all blocks have empty changesets (0 rows in NippyJar),
|
||||
// healing incorrectly pruned all blocks because the validation was skipped
|
||||
// when actual_nippy_rows == 0. But (offset=0, num_changes=0) is valid when rows=0.
|
||||
|
||||
let (static_dir, _) = create_test_static_files_dir();
|
||||
let provider = setup_test_provider(&static_dir, 100);
|
||||
|
||||
// Write 5 blocks with 0 changes each => 0 total rows in NippyJar
|
||||
let sidecar_path = write_test_blocks(&provider, 5, 0);
|
||||
assert_eq!(get_sidecar_block_count(&sidecar_path), 5);
|
||||
assert_eq!(get_header_block_count(&provider, 0), 5);
|
||||
assert_eq!(get_nippy_row_count(&provider, 0), 0);
|
||||
|
||||
// Reopen - healing must preserve all 5 blocks
|
||||
drop(provider);
|
||||
let provider = setup_test_provider(&static_dir, 100);
|
||||
let _writer = provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
|
||||
|
||||
// Must still have 5 blocks after healing
|
||||
assert_eq!(
|
||||
get_sidecar_block_count(&sidecar_path),
|
||||
5,
|
||||
"Healing should not prune empty blocks"
|
||||
);
|
||||
assert_eq!(get_header_block_count(&provider, 0), 5);
|
||||
assert_eq!(get_nippy_row_count(&provider, 0), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_empty_blocks_zero_changes() {
|
||||
// SCENARIO: Some blocks have 0 changes (empty changesets).
|
||||
|
||||
let (static_dir, _) = create_test_static_files_dir();
|
||||
let provider = setup_test_provider(&static_dir, 100);
|
||||
|
||||
{
|
||||
let mut writer = provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
|
||||
|
||||
// Block 0: 3 changes
|
||||
writer.append_account_changeset(generate_test_changeset(0, 3), 0).unwrap();
|
||||
|
||||
// Block 1: 0 changes (empty)
|
||||
writer.append_account_changeset(vec![], 1).unwrap();
|
||||
|
||||
// Block 2: 2 changes
|
||||
writer.append_account_changeset(generate_test_changeset(2, 2), 2).unwrap();
|
||||
|
||||
// Block 3: 0 changes (empty)
|
||||
writer.append_account_changeset(vec![], 3).unwrap();
|
||||
|
||||
// Block 4: 5 changes
|
||||
writer.append_account_changeset(generate_test_changeset(4, 5), 4).unwrap();
|
||||
|
||||
writer.commit().unwrap();
|
||||
}
|
||||
|
||||
let sidecar_path = get_sidecar_path(&provider, 0);
|
||||
|
||||
// Verify 5 blocks recorded
|
||||
assert_eq!(get_sidecar_block_count(&sidecar_path), 5);
|
||||
assert_eq!(get_header_block_count(&provider, 0), 5);
|
||||
|
||||
// Verify offsets are correct
|
||||
let mut reader = ChangesetOffsetReader::new(&sidecar_path, 5).unwrap();
|
||||
|
||||
let o0 = reader.get(0).unwrap().unwrap();
|
||||
assert_eq!(o0.offset(), 0);
|
||||
assert_eq!(o0.num_changes(), 3);
|
||||
|
||||
let o1 = reader.get(1).unwrap().unwrap();
|
||||
assert_eq!(o1.offset(), 3);
|
||||
assert_eq!(o1.num_changes(), 0); // Empty block
|
||||
|
||||
let o2 = reader.get(2).unwrap().unwrap();
|
||||
assert_eq!(o2.offset(), 3); // Same offset as block 1 (0 changes didn't advance)
|
||||
assert_eq!(o2.num_changes(), 2);
|
||||
|
||||
let o3 = reader.get(3).unwrap().unwrap();
|
||||
assert_eq!(o3.offset(), 5);
|
||||
assert_eq!(o3.num_changes(), 0); // Empty block
|
||||
|
||||
let o4 = reader.get(4).unwrap().unwrap();
|
||||
assert_eq!(o4.offset(), 5);
|
||||
assert_eq!(o4.num_changes(), 5);
|
||||
|
||||
// Total rows: 3 + 0 + 2 + 0 + 5 = 10
|
||||
assert_eq!(get_nippy_row_count(&provider, 0), 10);
|
||||
|
||||
// Reopen and verify healing doesn't break empty blocks
|
||||
drop(provider);
|
||||
let provider = setup_test_provider(&static_dir, 100);
|
||||
let _writer = provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
|
||||
assert_eq!(get_sidecar_block_count(&sidecar_path), 5);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_healing_never_enlarges_header() {
|
||||
// INVARIANT: Header is the commit marker. Healing should NEVER enlarge it.
|
||||
// Even if sidecar has more valid blocks, we trust header.
|
||||
|
||||
let (static_dir, _) = create_test_static_files_dir();
|
||||
let provider = setup_test_provider(&static_dir, 100);
|
||||
|
||||
// Write 5 blocks
|
||||
let sidecar_path = write_test_blocks(&provider, 5, 3);
|
||||
|
||||
// Add more blocks to sidecar that would be "valid" (point within existing rows)
|
||||
// These simulate uncommitted blocks from a crashed append
|
||||
corrupt_sidecar_add_fake_blocks(&sidecar_path, 3, 0);
|
||||
assert_eq!(get_sidecar_block_count(&sidecar_path), 8);
|
||||
|
||||
// Reopen - healing should clamp to header's 5, not enlarge to 8
|
||||
drop(provider);
|
||||
let provider = setup_test_provider(&static_dir, 100);
|
||||
let _writer = provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
|
||||
|
||||
assert_eq!(get_sidecar_block_count(&sidecar_path), 5, "Must clamp to header");
|
||||
assert_eq!(get_header_block_count(&provider, 0), 5, "Header unchanged");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_multiple_reopen_cycles_stable() {
|
||||
// STABILITY: Opening and closing multiple times shouldn't change anything.
|
||||
|
||||
let (static_dir, _) = create_test_static_files_dir();
|
||||
|
||||
// Initial write
|
||||
{
|
||||
let provider = setup_test_provider(&static_dir, 100);
|
||||
write_test_blocks(&provider, 10, 5);
|
||||
}
|
||||
|
||||
// Reopen 5 times
|
||||
for i in 0..5 {
|
||||
let provider = setup_test_provider(&static_dir, 100);
|
||||
let sidecar_path = get_sidecar_path(&provider, 0);
|
||||
|
||||
let _writer = provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
|
||||
|
||||
assert_eq!(get_sidecar_block_count(&sidecar_path), 10, "Cycle {}: block count", i);
|
||||
assert_eq!(get_header_block_count(&provider, 0), 10, "Cycle {}: header", i);
|
||||
assert_eq!(get_nippy_row_count(&provider, 0), 50, "Cycle {}: rows", i);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_combined_partial_and_extra_blocks() {
|
||||
// COMBINED: Partial record AND extra complete blocks.
|
||||
// Healing should handle both: truncate partial, then validate remaining.
|
||||
|
||||
let (static_dir, _) = create_test_static_files_dir();
|
||||
let provider = setup_test_provider(&static_dir, 100);
|
||||
|
||||
// Write 5 blocks
|
||||
let sidecar_path = write_test_blocks(&provider, 5, 3);
|
||||
|
||||
// Corrupt: add 2 fake blocks pointing past EOF, then partial record
|
||||
corrupt_sidecar_add_fake_blocks(&sidecar_path, 2, 100); // Invalid offsets
|
||||
corrupt_sidecar_partial_write(&sidecar_path, 10); // Partial record
|
||||
|
||||
let file_size = std::fs::metadata(&sidecar_path).unwrap().len();
|
||||
assert_eq!(file_size, 5 * 16 + 2 * 16 + 10, "5 valid + 2 fake + 10 partial");
|
||||
|
||||
// Reopen - healing should:
|
||||
// 1. Truncate partial (10 bytes)
|
||||
// 2. Clamp to header's 5 blocks (remove 2 fake blocks)
|
||||
drop(provider);
|
||||
let provider = setup_test_provider(&static_dir, 100);
|
||||
let _writer = provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
|
||||
|
||||
assert_eq!(get_sidecar_block_count(&sidecar_path), 5);
|
||||
assert_eq!(get_header_block_count(&provider, 0), 5);
|
||||
}
|
||||
|
||||
// ==================== REGRESSION TESTS ====================
|
||||
|
||||
#[test]
|
||||
fn test_prune_double_decrement_regression() {
|
||||
// REGRESSION: Previously, healing called set_changeset_offsets_len() then prune(),
|
||||
// causing double decrement. Now we only call prune() which handles both.
|
||||
|
||||
let (static_dir, _) = create_test_static_files_dir();
|
||||
let provider = setup_test_provider(&static_dir, 100);
|
||||
|
||||
// Write 10 blocks
|
||||
let sidecar_path = write_test_blocks(&provider, 10, 5);
|
||||
|
||||
// Simulate prune crash: sidecar at 7, header should be updated from 10 to 7
|
||||
truncate_sidecar(&sidecar_path, 7);
|
||||
|
||||
// Reopen - healing fixes header
|
||||
drop(provider);
|
||||
let provider = setup_test_provider(&static_dir, 100);
|
||||
{
|
||||
let _writer = provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
|
||||
// Writer commits healed header on open
|
||||
}
|
||||
// Reopen provider to see committed changes
|
||||
drop(provider);
|
||||
let provider = setup_test_provider(&static_dir, 100);
|
||||
|
||||
// Should be exactly 7, not 7 - (10-7) = 4 (double decrement bug)
|
||||
assert_eq!(get_header_block_count(&provider, 0), 7);
|
||||
assert_eq!(get_sidecar_block_count(&sidecar_path), 7);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prune_with_uncommitted_sidecar_records() {
|
||||
// REGRESSION: truncate_changesets() previously read file size from disk instead of
|
||||
// using the committed length from header. After a crash, sidecar may have uncommitted
|
||||
// records. The fix uses ChangesetOffsetReader::new() with explicit length.
|
||||
//
|
||||
// SCENARIO: Simulate crash where sidecar has more records than header claims,
|
||||
// then prune. The prune should use header's block count, not sidecar's.
|
||||
|
||||
let (static_dir, _) = create_test_static_files_dir();
|
||||
let provider = setup_test_provider(&static_dir, 100);
|
||||
|
||||
// Step 1: Write 10 blocks with 5 changes each, commit
|
||||
let sidecar_path = write_test_blocks(&provider, 10, 5);
|
||||
assert_eq!(get_sidecar_block_count(&sidecar_path), 10);
|
||||
assert_eq!(get_header_block_count(&provider, 0), 10);
|
||||
assert_eq!(get_nippy_row_count(&provider, 0), 50);
|
||||
|
||||
// Step 2: Corrupt sidecar by adding 3 fake blocks that point to valid but
|
||||
// uncommitted offsets. This simulates a crash where sidecar was synced but
|
||||
// header wasn't committed.
|
||||
corrupt_sidecar_add_fake_blocks(&sidecar_path, 3, 50);
|
||||
assert_eq!(get_sidecar_block_count(&sidecar_path), 13, "Sidecar has 3 uncommitted blocks");
|
||||
|
||||
// Step 3: Reopen provider - healing runs and clamps sidecar to header's 10
|
||||
drop(provider);
|
||||
let provider = setup_test_provider(&static_dir, 100);
|
||||
{
|
||||
let _writer = provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
|
||||
}
|
||||
assert_eq!(get_sidecar_block_count(&sidecar_path), 10, "Healing clamped sidecar to 10");
|
||||
assert_eq!(get_header_block_count(&provider, 0), 10);
|
||||
|
||||
// Step 4: Prune to block 6 (keep blocks 0-6, remove 7-9)
|
||||
{
|
||||
let mut writer = provider.latest_writer(StaticFileSegment::AccountChangeSets).unwrap();
|
||||
writer.prune_account_changesets(6).unwrap();
|
||||
writer.commit().unwrap();
|
||||
}
|
||||
|
||||
// Step 5: Verify sidecar has 7 blocks, header has 7 blocks, rows are correct
|
||||
drop(provider);
|
||||
let provider = setup_test_provider(&static_dir, 100);
|
||||
|
||||
assert_eq!(get_sidecar_block_count(&sidecar_path), 7, "Sidecar should have 7 blocks");
|
||||
assert_eq!(get_header_block_count(&provider, 0), 7, "Header should have 7 blocks");
|
||||
assert_eq!(
|
||||
get_nippy_row_count(&provider, 0),
|
||||
35,
|
||||
"Should have 7 blocks * 5 changes = 35 rows"
|
||||
);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user