mirror of
https://github.com/ChainSafe/lodestar.git
synced 2026-01-08 15:23:57 -05:00
feat: add Era File Reading and Writing (#8035)
**Motivation**
- tracks #7048
-
[era](https://github.com/eth-clients/e2store-format-specs/blob/main/formats/era.md)
specs.
**Description**
- Adds functionality to read/write to
[e2s](613f4a9a50/docs/e2store.md (era-files))
files
```ts
import {open} from "node:fs/promises";
import {e2s} from "@lodestar/era";
const fh = await open("mainnet-xxxxxx-xxxxxxxx.era");
const entry = await e2s.readEntry(fh, 0);
entry.type == e2s.EntryType.Version
```
- Adds functionality to read/write era files
```ts
import {era} from "@lodestar/era";
import {config} from "@lodestar/config/default";
// open reader
const reader = await era.EraReader.open(config, "mainnet-xxxxx-xxxxxxxx.era");
// check number of groups
reader.groups.length === 1;
// read blocks
const slot = reader.groups[0].startSlot;
// return snappy-frame compressed, ssz-serialized block at slot or null if a skip slot
// throws if out of range
await reader.readCompressedBlock(slot);
// same, but for ssz-serialized block
await reader.readSerializedBlock(slot);
// same but for deserialized block
await reader.readBlock(slot);
// read state(s), one per group
// similar api to blocks, but with an _optional_ eraNumber param for specifying which group's state to read
await reader.readCompressedState();
await reader.readSerializedState();
await reader.readState();
// write era files
const writer = await era.EraWriter.create(config, "path/to/era");
// similar api to reader, can write compressed, serialized, or deserialized items
// first write all blocks for the era
await writer.writeBlock(block);
// ...
// then write the state
await writer.writeState(state);
// if applicable, continue writing eras of blocks and state (an era file can contain multiple eras, or "groups" as the spec states)
// when finished, must call `finish`, which will close the file handler and _rename_ the file to the spec-compliant name
await writer.finish();
```
- e2e test reads an era file, does all validation, writes an era fila,
does validation on that freshly created file
- requires the era file fixture to be downloaded (`cd packages/era/test
&& ./download_era_file.sh`)
- e2e test is skipped (`test:e2e` is not defined for the era package)
---------
Co-authored-by: Cayman <caymannava@gmail.com>
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@@ -29,6 +29,7 @@ validators
|
||||
.vscode/launch.json
|
||||
!.vscode/settings.json
|
||||
.vscode/tasks.json
|
||||
.claude/
|
||||
|
||||
# Tests artifacts
|
||||
packages/*/spec-tests*
|
||||
|
||||
1
packages/era/.gitignore
vendored
Normal file
1
packages/era/.gitignore
vendored
Normal file
@@ -0,0 +1 @@
|
||||
*.era
|
||||
201
packages/era/LICENSE
Normal file
201
packages/era/LICENSE
Normal file
@@ -0,0 +1,201 @@
|
||||
Apache License
|
||||
Version 2.0, January 2004
|
||||
http://www.apache.org/licenses/
|
||||
|
||||
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
|
||||
|
||||
1. Definitions.
|
||||
|
||||
"License" shall mean the terms and conditions for use, reproduction,
|
||||
and distribution as defined by Sections 1 through 9 of this document.
|
||||
|
||||
"Licensor" shall mean the copyright owner or entity authorized by
|
||||
the copyright owner that is granting the License.
|
||||
|
||||
"Legal Entity" shall mean the union of the acting entity and all
|
||||
other entities that control, are controlled by, or are under common
|
||||
control with that entity. For the purposes of this definition,
|
||||
"control" means (i) the power, direct or indirect, to cause the
|
||||
direction or management of such entity, whether by contract or
|
||||
otherwise, or (ii) ownership of fifty percent (50%) or more of the
|
||||
outstanding shares, or (iii) beneficial ownership of such entity.
|
||||
|
||||
"You" (or "Your") shall mean an individual or Legal Entity
|
||||
exercising permissions granted by this License.
|
||||
|
||||
"Source" form shall mean the preferred form for making modifications,
|
||||
including but not limited to software source code, documentation
|
||||
source, and configuration files.
|
||||
|
||||
"Object" form shall mean any form resulting from mechanical
|
||||
transformation or translation of a Source form, including but
|
||||
not limited to compiled object code, generated documentation,
|
||||
and conversions to other media types.
|
||||
|
||||
"Work" shall mean the work of authorship, whether in Source or
|
||||
Object form, made available under the License, as indicated by a
|
||||
copyright notice that is included in or attached to the work
|
||||
(an example is provided in the Appendix below).
|
||||
|
||||
"Derivative Works" shall mean any work, whether in Source or Object
|
||||
form, that is based on (or derived from) the Work and for which the
|
||||
editorial revisions, annotations, elaborations, or other modifications
|
||||
represent, as a whole, an original work of authorship. For the purposes
|
||||
of this License, Derivative Works shall not include works that remain
|
||||
separable from, or merely link (or bind by name) to the interfaces of,
|
||||
the Work and Derivative Works thereof.
|
||||
|
||||
"Contribution" shall mean any work of authorship, including
|
||||
the original version of the Work and any modifications or additions
|
||||
to that Work or Derivative Works thereof, that is intentionally
|
||||
submitted to Licensor for inclusion in the Work by the copyright owner
|
||||
or by an individual or Legal Entity authorized to submit on behalf of
|
||||
the copyright owner. For the purposes of this definition, "submitted"
|
||||
means any form of electronic, verbal, or written communication sent
|
||||
to the Licensor or its representatives, including but not limited to
|
||||
communication on electronic mailing lists, source code control systems,
|
||||
and issue tracking systems that are managed by, or on behalf of, the
|
||||
Licensor for the purpose of discussing and improving the Work, but
|
||||
excluding communication that is conspicuously marked or otherwise
|
||||
designated in writing by the copyright owner as "Not a Contribution."
|
||||
|
||||
"Contributor" shall mean Licensor and any individual or Legal Entity
|
||||
on behalf of whom a Contribution has been received by Licensor and
|
||||
subsequently incorporated within the Work.
|
||||
|
||||
2. Grant of Copyright License. Subject to the terms and conditions of
|
||||
this License, each Contributor hereby grants to You a perpetual,
|
||||
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
|
||||
copyright license to reproduce, prepare Derivative Works of,
|
||||
publicly display, publicly perform, sublicense, and distribute the
|
||||
Work and such Derivative Works in Source or Object form.
|
||||
|
||||
3. Grant of Patent License. Subject to the terms and conditions of
|
||||
this License, each Contributor hereby grants to You a perpetual,
|
||||
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
|
||||
(except as stated in this section) patent license to make, have made,
|
||||
use, offer to sell, sell, import, and otherwise transfer the Work,
|
||||
where such license applies only to those patent claims licensable
|
||||
by such Contributor that are necessarily infringed by their
|
||||
Contribution(s) alone or by combination of their Contribution(s)
|
||||
with the Work to which such Contribution(s) was submitted. If You
|
||||
institute patent litigation against any entity (including a
|
||||
cross-claim or counterclaim in a lawsuit) alleging that the Work
|
||||
or a Contribution incorporated within the Work constitutes direct
|
||||
or contributory patent infringement, then any patent licenses
|
||||
granted to You under this License for that Work shall terminate
|
||||
as of the date such litigation is filed.
|
||||
|
||||
4. Redistribution. You may reproduce and distribute copies of the
|
||||
Work or Derivative Works thereof in any medium, with or without
|
||||
modifications, and in Source or Object form, provided that You
|
||||
meet the following conditions:
|
||||
|
||||
(a) You must give any other recipients of the Work or
|
||||
Derivative Works a copy of this License; and
|
||||
|
||||
(b) You must cause any modified files to carry prominent notices
|
||||
stating that You changed the files; and
|
||||
|
||||
(c) You must retain, in the Source form of any Derivative Works
|
||||
that You distribute, all copyright, patent, trademark, and
|
||||
attribution notices from the Source form of the Work,
|
||||
excluding those notices that do not pertain to any part of
|
||||
the Derivative Works; and
|
||||
|
||||
(d) If the Work includes a "NOTICE" text file as part of its
|
||||
distribution, then any Derivative Works that You distribute must
|
||||
include a readable copy of the attribution notices contained
|
||||
within such NOTICE file, excluding those notices that do not
|
||||
pertain to any part of the Derivative Works, in at least one
|
||||
of the following places: within a NOTICE text file distributed
|
||||
as part of the Derivative Works; within the Source form or
|
||||
documentation, if provided along with the Derivative Works; or,
|
||||
within a display generated by the Derivative Works, if and
|
||||
wherever such third-party notices normally appear. The contents
|
||||
of the NOTICE file are for informational purposes only and
|
||||
do not modify the License. You may add Your own attribution
|
||||
notices within Derivative Works that You distribute, alongside
|
||||
or as an addendum to the NOTICE text from the Work, provided
|
||||
that such additional attribution notices cannot be construed
|
||||
as modifying the License.
|
||||
|
||||
You may add Your own copyright statement to Your modifications and
|
||||
may provide additional or different license terms and conditions
|
||||
for use, reproduction, or distribution of Your modifications, or
|
||||
for any such Derivative Works as a whole, provided Your use,
|
||||
reproduction, and distribution of the Work otherwise complies with
|
||||
the conditions stated in this License.
|
||||
|
||||
5. Submission of Contributions. Unless You explicitly state otherwise,
|
||||
any Contribution intentionally submitted for inclusion in the Work
|
||||
by You to the Licensor shall be under the terms and conditions of
|
||||
this License, without any additional terms or conditions.
|
||||
Notwithstanding the above, nothing herein shall supersede or modify
|
||||
the terms of any separate license agreement you may have executed
|
||||
with Licensor regarding such Contributions.
|
||||
|
||||
6. Trademarks. This License does not grant permission to use the trade
|
||||
names, trademarks, service marks, or product names of the Licensor,
|
||||
except as required for reasonable and customary use in describing the
|
||||
origin of the Work and reproducing the content of the NOTICE file.
|
||||
|
||||
7. Disclaimer of Warranty. Unless required by applicable law or
|
||||
agreed to in writing, Licensor provides the Work (and each
|
||||
Contributor provides its Contributions) on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
implied, including, without limitation, any warranties or conditions
|
||||
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
|
||||
PARTICULAR PURPOSE. You are solely responsible for determining the
|
||||
appropriateness of using or redistributing the Work and assume any
|
||||
risks associated with Your exercise of permissions under this License.
|
||||
|
||||
8. Limitation of Liability. In no event and under no legal theory,
|
||||
whether in tort (including negligence), contract, or otherwise,
|
||||
unless required by applicable law (such as deliberate and grossly
|
||||
negligent acts) or agreed to in writing, shall any Contributor be
|
||||
liable to You for damages, including any direct, indirect, special,
|
||||
incidental, or consequential damages of any character arising as a
|
||||
result of this License or out of the use or inability to use the
|
||||
Work (including but not limited to damages for loss of goodwill,
|
||||
work stoppage, computer failure or malfunction, or any and all
|
||||
other commercial damages or losses), even if such Contributor
|
||||
has been advised of the possibility of such damages.
|
||||
|
||||
9. Accepting Warranty or Additional Liability. While redistributing
|
||||
the Work or Derivative Works thereof, You may choose to offer,
|
||||
and charge a fee for, acceptance of support, warranty, indemnity,
|
||||
or other liability obligations and/or rights consistent with this
|
||||
License. However, in accepting such obligations, You may act only
|
||||
on Your own behalf and on Your sole responsibility, not on behalf
|
||||
of any other Contributor, and only if You agree to indemnify,
|
||||
defend, and hold each Contributor harmless for any liability
|
||||
incurred by, or claims asserted against, such Contributor by reason
|
||||
of your accepting any such warranty or additional liability.
|
||||
|
||||
END OF TERMS AND CONDITIONS
|
||||
|
||||
APPENDIX: How to apply the Apache License to your work.
|
||||
|
||||
To apply the Apache License to your work, attach the following
|
||||
boilerplate notice, with the fields enclosed by brackets "[]"
|
||||
replaced with your own identifying information. (Don't include
|
||||
the brackets!) The text should be enclosed in the appropriate
|
||||
comment syntax for the file format. We also recommend that a
|
||||
file or class name and description of purpose be included on the
|
||||
same "printed page" as the copyright notice for easier
|
||||
identification within third-party archives.
|
||||
|
||||
Copyright [yyyy] [name of copyright owner]
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
76
packages/era/README.md
Normal file
76
packages/era/README.md
Normal file
@@ -0,0 +1,76 @@
|
||||
# `@lodestar/era`
|
||||
|
||||
> This package is part of [ChainSafe's Lodestar](https://lodestar.chainsafe.io) project
|
||||
|
||||
## Usage
|
||||
|
||||
This package provides functionality to read and write [era files](https://github.com/eth-clients/e2store-format-specs/blob/main/formats/era.md), which are based on the [e2store format](https://github.com/status-im/nimbus-eth2/blob/stable/docs/e2store.md#introduction).
|
||||
|
||||
### Reading/Writing e2s files
|
||||
|
||||
```ts
|
||||
import {open} from "node:fs/promises";
|
||||
import {e2s} from "@lodestar/era";
|
||||
|
||||
const fh = await open("mainnet-xxxxxx-xxxxxxxx.era");
|
||||
const entry = await e2s.readEntry(fh, 0);
|
||||
entry.type == e2s.EntryType.Version;
|
||||
```
|
||||
|
||||
### Reading era files
|
||||
|
||||
```ts
|
||||
import {era} from "@lodestar/era";
|
||||
import {config} from "@lodestar/config/default";
|
||||
|
||||
// open reader
|
||||
const reader = await era.EraReader.open(config, "mainnet-xxxxx-xxxxxxxx.era");
|
||||
|
||||
// check number of groups
|
||||
reader.groups.length === 1;
|
||||
|
||||
// read blocks
|
||||
const slot = reader.groups[0].blocksIndex?.startSlot ?? 0;
|
||||
|
||||
// return snappy-frame compressed, ssz-serialized block at slot or null if a skip slot
|
||||
// throws if out of range
|
||||
await reader.readCompressedBlock(slot);
|
||||
// same, but for ssz-serialized block
|
||||
await reader.readSerializedBlock(slot);
|
||||
// same but for deserialized block
|
||||
await reader.readBlock(slot);
|
||||
|
||||
// read state(s), one per group
|
||||
// similar api to blocks, but with an optional eraNumber param for specifying which group's state to read
|
||||
await reader.readCompressedState();
|
||||
await reader.readSerializedState();
|
||||
await reader.readState();
|
||||
```
|
||||
|
||||
### Writing era files
|
||||
|
||||
```ts
|
||||
import {era} from "@lodestar/era";
|
||||
import {config} from "@lodestar/config/default";
|
||||
import {SignedBeaconBlock, BeaconState} from "@lodestar/types";
|
||||
|
||||
const writer = await era.EraWriter.create(config, "path/to/era", 0);
|
||||
|
||||
// similar api to reader, can write compressed, serialized, or deserialized items
|
||||
// first write all blocks for the era
|
||||
// Assuming `block` is a SignedBeaconBlock
|
||||
declare const block: SignedBeaconBlock;
|
||||
await writer.writeBlock(block);
|
||||
// ...
|
||||
// then write the state
|
||||
// Assuming `state` is a BeaconState
|
||||
declare const state: BeaconState;
|
||||
await writer.writeState(state);
|
||||
// if applicable, continue writing eras of blocks and state (an era file can contain multiple eras, or "groups" as the spec states)
|
||||
// when finished, must call `finish`, which will close the file handler and rename the file to the spec-compliant name
|
||||
await writer.finish();
|
||||
```
|
||||
|
||||
## License
|
||||
|
||||
Apache-2.0 [ChainSafe Systems](https://chainsafe.io)
|
||||
49
packages/era/package.json
Normal file
49
packages/era/package.json
Normal file
@@ -0,0 +1,49 @@
|
||||
{
|
||||
"name": "@lodestar/era",
|
||||
"description": "Era file handling module for Lodestar",
|
||||
"license": "Apache-2.0",
|
||||
"author": "ChainSafe Systems",
|
||||
"homepage": "https://github.com/ChainSafe/lodestar#readme",
|
||||
"repository": {
|
||||
"type": "git",
|
||||
"url": "git+https://github.com/ChainSafe/lodestar.git"
|
||||
},
|
||||
"bugs": {
|
||||
"url": "https://github.com/ChainSafe/lodestar/issues"
|
||||
},
|
||||
"version": "1.36.0",
|
||||
"type": "module",
|
||||
"exports": {
|
||||
".": {
|
||||
"bun": "./src/index.ts",
|
||||
"types": "./lib/index.d.ts",
|
||||
"import": "./lib/index.js"
|
||||
}
|
||||
},
|
||||
"files": [
|
||||
"src",
|
||||
"lib",
|
||||
"!**/*.tsbuildinfo"
|
||||
],
|
||||
"scripts": {
|
||||
"clean": "rm -rf lib && rm -f *.tsbuildinfo",
|
||||
"build": "tsc -p tsconfig.build.json",
|
||||
"build:watch": "yarn run build --watch",
|
||||
"build:release": "yarn clean && yarn run build",
|
||||
"check-build": "node -e \"(async function() { await import('./lib/index.js') })()\"",
|
||||
"check-types": "tsc",
|
||||
"lint": "biome check src/ test/",
|
||||
"lint:fix": "yarn run lint --write",
|
||||
"test": "yarn test:unit",
|
||||
"test:unit": "vitest run --project unit --project unit-minimal",
|
||||
"check-readme": "typescript-docs-verifier"
|
||||
},
|
||||
"dependencies": {
|
||||
"@chainsafe/blst": "^2.2.0",
|
||||
"@lodestar/config": "^1.36.0",
|
||||
"@lodestar/params": "^1.36.0",
|
||||
"@lodestar/reqresp": "^1.36.0",
|
||||
"@lodestar/types": "^1.36.0",
|
||||
"uint8arraylist": "^2.4.7"
|
||||
}
|
||||
}
|
||||
178
packages/era/src/e2s.ts
Normal file
178
packages/era/src/e2s.ts
Normal file
@@ -0,0 +1,178 @@
|
||||
import type {FileHandle} from "node:fs/promises";
|
||||
import {Slot} from "@lodestar/types";
|
||||
import {readInt48, readUint16, readUint32, writeInt48, writeUint16, writeUint32} from "./util.ts";
|
||||
|
||||
/**
|
||||
* Known entry types in an E2Store (.e2s) file along with their exact 2-byte codes.
|
||||
*/
|
||||
export enum EntryType {
|
||||
Empty = 0,
|
||||
CompressedSignedBeaconBlock = 1,
|
||||
CompressedBeaconState = 2,
|
||||
Version = 0x65 | (0x32 << 8), // "e2" in ASCII
|
||||
SlotIndex = 0x69 | (0x32 << 8),
|
||||
}
|
||||
/**
|
||||
* Logical, parsed entry from an E2Store file.
|
||||
*/
|
||||
export interface Entry {
|
||||
type: EntryType;
|
||||
data: Uint8Array;
|
||||
}
|
||||
|
||||
/**
|
||||
* Maps slots to file positions in an era file.
|
||||
* - Block index: count = SLOTS_PER_HISTORICAL_ROOT, maps slots to blocks
|
||||
* - State index: count = 1, points to the era state
|
||||
* - Zero offset = empty slot (no block)
|
||||
*/
|
||||
export interface SlotIndex {
|
||||
type: EntryType.SlotIndex;
|
||||
/** First slot covered by this index (era * SLOTS_PER_HISTORICAL_ROOT) */
|
||||
startSlot: Slot;
|
||||
/** File positions where data can be found. Length varies by index type. */
|
||||
offsets: number[];
|
||||
/** File position where this index record starts */
|
||||
recordStart: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* The complete version record (8 bytes total).
|
||||
*/
|
||||
export const VERSION_RECORD_BYTES = new Uint8Array([0x65, 0x32, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]);
|
||||
|
||||
/**
|
||||
* E2Store header size in bytes
|
||||
*/
|
||||
export const E2STORE_HEADER_SIZE = 8;
|
||||
|
||||
/**
|
||||
* Helper to read entry at a specific offset from an open file handle.
|
||||
* Reads header first to determine data length, then reads the complete entry.
|
||||
*/
|
||||
export async function readEntry(fh: FileHandle, offset: number): Promise<Entry> {
|
||||
// Read header (8 bytes)
|
||||
const header = new Uint8Array(E2STORE_HEADER_SIZE);
|
||||
await fh.read(header, 0, E2STORE_HEADER_SIZE, offset);
|
||||
const {type, length} = parseEntryHeader(header);
|
||||
|
||||
// Read entry payload/data
|
||||
const data = new Uint8Array(length);
|
||||
await fh.read(data, 0, data.length, offset + E2STORE_HEADER_SIZE);
|
||||
|
||||
return {type, data};
|
||||
}
|
||||
|
||||
/**
|
||||
* Read an e2Store entry (header + data)
|
||||
* Header: 2 bytes type + 4 bytes length (LE) + 2 bytes reserved (must be 0)
|
||||
*/
|
||||
export function parseEntryHeader(header: Uint8Array): {type: EntryType; length: number} {
|
||||
if (header.length < E2STORE_HEADER_SIZE) {
|
||||
throw new Error(`Buffer too small for E2Store header: need ${E2STORE_HEADER_SIZE} bytes, got ${header.length}`);
|
||||
}
|
||||
|
||||
// validate entry type from first 2 bytes
|
||||
const typeCode = readUint16(header, 0);
|
||||
if (!(typeCode in EntryType)) {
|
||||
throw new Error(`Unknown E2Store entry type: 0x${typeCode.toString(16)}`);
|
||||
}
|
||||
const type = typeCode as EntryType;
|
||||
|
||||
// Parse data length from next 4 bytes (offset 2, little endian)
|
||||
const length = readUint32(header, 2);
|
||||
|
||||
// Validate reserved bytes are zero (offset 6-7)
|
||||
const reserved = readUint16(header, 6);
|
||||
if (reserved !== 0) {
|
||||
throw new Error(`E2Store reserved bytes must be zero, got: ${reserved}`);
|
||||
}
|
||||
|
||||
return {type, length};
|
||||
}
|
||||
|
||||
export async function readVersion(fh: FileHandle, offset: number): Promise<void> {
|
||||
const versionHeader = new Uint8Array(E2STORE_HEADER_SIZE);
|
||||
await fh.read(versionHeader, 0, E2STORE_HEADER_SIZE, offset);
|
||||
if (Buffer.compare(versionHeader, VERSION_RECORD_BYTES) !== 0) {
|
||||
throw new Error("Invalid E2Store version record");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Read a SlotIndex from a file handle.
|
||||
*/
|
||||
export async function readSlotIndex(fh: FileHandle, offset: number): Promise<SlotIndex> {
|
||||
const recordEnd = offset;
|
||||
const countBuffer = new Uint8Array(8);
|
||||
await fh.read(countBuffer, 0, 8, recordEnd - 8);
|
||||
const count = readInt48(countBuffer, 0);
|
||||
|
||||
const recordStart = recordEnd - (8 * count + 24);
|
||||
|
||||
// Validate index position is within file bounds
|
||||
if (recordStart < 0) {
|
||||
throw new Error(`SlotIndex position ${recordStart} is invalid - file too small for count=${count}`);
|
||||
}
|
||||
|
||||
// Read and validate the slot index entry
|
||||
const entry = await readEntry(fh, recordStart);
|
||||
if (entry.type !== EntryType.SlotIndex) {
|
||||
throw new Error(`Expected SlotIndex entry, got ${entry.type}`);
|
||||
}
|
||||
|
||||
// Size: startSlot(8) + offsets(count*8) + count(8) = count*8 + 16
|
||||
const expectedSize = 8 * count + 16;
|
||||
if (entry.data.length !== expectedSize) {
|
||||
throw new Error(`SlotIndex payload size must be exactly ${expectedSize} bytes, got ${entry.data.length}`);
|
||||
}
|
||||
|
||||
// Parse start slot from payload
|
||||
const startSlot = readInt48(entry.data, 0);
|
||||
|
||||
const offsets: number[] = [];
|
||||
for (let i = 0; i < count; i++) {
|
||||
offsets.push(readInt48(entry.data, 8 * i + 8));
|
||||
}
|
||||
|
||||
return {
|
||||
type: EntryType.SlotIndex,
|
||||
startSlot,
|
||||
offsets,
|
||||
recordStart,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Write a single E2Store TLV entry (header + payload)
|
||||
* Header layout: type[2] | length u32 LE | reserved u16(=0)
|
||||
*/
|
||||
export async function writeEntry(fh: FileHandle, offset: number, type: EntryType, payload: Uint8Array): Promise<void> {
|
||||
const header = new Uint8Array(E2STORE_HEADER_SIZE);
|
||||
writeUint16(header, 0, type); // type (2 bytes)
|
||||
writeUint32(header, 2, payload.length); // length (4 bytes)
|
||||
// reserved bytes (6-7) remain 0
|
||||
await fh.writev([header, payload], offset);
|
||||
}
|
||||
|
||||
export async function writeVersion(fh: FileHandle, offset: number): Promise<void> {
|
||||
await fh.write(VERSION_RECORD_BYTES, 0, VERSION_RECORD_BYTES.length, offset);
|
||||
}
|
||||
|
||||
export function serializeSlotIndex(slotIndex: SlotIndex): Uint8Array {
|
||||
const count = slotIndex.offsets.length;
|
||||
const payload = new Uint8Array(count * 8 + 16);
|
||||
|
||||
// startSlot
|
||||
writeInt48(payload, 0, slotIndex.startSlot);
|
||||
|
||||
// offsets
|
||||
let off = 8;
|
||||
for (let i = 0; i < count; i++, off += 8) {
|
||||
writeInt48(payload, off, slotIndex.offsets[i]);
|
||||
}
|
||||
|
||||
// trailing count
|
||||
writeInt48(payload, 8 + count * 8, count);
|
||||
return payload;
|
||||
}
|
||||
3
packages/era/src/era/index.ts
Normal file
3
packages/era/src/era/index.ts
Normal file
@@ -0,0 +1,3 @@
|
||||
export * from "./reader.js";
|
||||
export * from "./util.js";
|
||||
export * from "./writer.js";
|
||||
196
packages/era/src/era/reader.ts
Normal file
196
packages/era/src/era/reader.ts
Normal file
@@ -0,0 +1,196 @@
|
||||
import {type FileHandle, open} from "node:fs/promises";
|
||||
import {basename} from "node:path";
|
||||
import {PublicKey, Signature, verify} from "@chainsafe/blst";
|
||||
import {ChainForkConfig, createCachedGenesis} from "@lodestar/config";
|
||||
import {DOMAIN_BEACON_PROPOSER, SLOTS_PER_HISTORICAL_ROOT} from "@lodestar/params";
|
||||
import {BeaconState, SignedBeaconBlock, Slot, ssz} from "@lodestar/types";
|
||||
import {E2STORE_HEADER_SIZE, EntryType, readEntry, readVersion} from "../e2s.ts";
|
||||
import {snappyUncompress} from "../util.ts";
|
||||
import {
|
||||
EraIndices,
|
||||
computeEraNumberFromBlockSlot,
|
||||
parseEraName,
|
||||
readAllEraIndices,
|
||||
readSlotFromBeaconStateBytes,
|
||||
} from "./util.ts";
|
||||
|
||||
/**
|
||||
* EraReader is responsible for reading and validating ERA files.
|
||||
*
|
||||
* See https://github.com/eth-clients/e2store-format-specs/blob/main/formats/era.md
|
||||
*/
|
||||
export class EraReader {
|
||||
readonly config: ChainForkConfig;
|
||||
/** The underlying file handle */
|
||||
readonly fh: FileHandle;
|
||||
/** The era number retrieved from the file name */
|
||||
readonly eraNumber: number;
|
||||
/** The short historical root retrieved from the file name */
|
||||
readonly shortHistoricalRoot: string;
|
||||
/** An array of state and block indices, one per group */
|
||||
readonly groups: EraIndices[];
|
||||
|
||||
constructor(
|
||||
config: ChainForkConfig,
|
||||
fh: FileHandle,
|
||||
eraNumber: number,
|
||||
shortHistoricalRoot: string,
|
||||
indices: EraIndices[]
|
||||
) {
|
||||
this.config = config;
|
||||
this.fh = fh;
|
||||
this.eraNumber = eraNumber;
|
||||
this.shortHistoricalRoot = shortHistoricalRoot;
|
||||
this.groups = indices;
|
||||
}
|
||||
|
||||
static async open(config: ChainForkConfig, path: string): Promise<EraReader> {
|
||||
const fh = await open(path, "r");
|
||||
const name = basename(path);
|
||||
const {configName, eraNumber, shortHistoricalRoot} = parseEraName(name);
|
||||
if (config.CONFIG_NAME !== configName) {
|
||||
throw new Error(`Config name mismatch: expected ${config.CONFIG_NAME}, got ${configName}`);
|
||||
}
|
||||
const indices = await readAllEraIndices(fh);
|
||||
return new EraReader(config, fh, eraNumber, shortHistoricalRoot, indices);
|
||||
}
|
||||
|
||||
/**
|
||||
* Close the underlying file descriptor
|
||||
*
|
||||
* No further actions can be taken after this operation
|
||||
*/
|
||||
async close(): Promise<void> {
|
||||
await this.fh.close();
|
||||
}
|
||||
|
||||
async readCompressedState(eraNumber?: number): Promise<Uint8Array> {
|
||||
eraNumber = eraNumber ?? this.eraNumber;
|
||||
const index = this.groups.at(eraNumber - this.eraNumber);
|
||||
if (!index) {
|
||||
throw new Error(`No index found for era number ${eraNumber}`);
|
||||
}
|
||||
const entry = await readEntry(this.fh, index.stateIndex.recordStart + index.stateIndex.offsets[0]);
|
||||
|
||||
if (entry.type !== EntryType.CompressedBeaconState) {
|
||||
throw new Error(`Expected CompressedBeaconState, got ${entry.type}`);
|
||||
}
|
||||
|
||||
return entry.data;
|
||||
}
|
||||
|
||||
async readSerializedState(eraNumber?: number): Promise<Uint8Array> {
|
||||
const compressed = await this.readCompressedState(eraNumber);
|
||||
return snappyUncompress(compressed);
|
||||
}
|
||||
|
||||
async readState(eraNumber?: number): Promise<BeaconState> {
|
||||
const serialized = await this.readSerializedState(eraNumber);
|
||||
const stateSlot = readSlotFromBeaconStateBytes(serialized);
|
||||
return this.config.getForkTypes(stateSlot).BeaconState.deserialize(serialized);
|
||||
}
|
||||
|
||||
async readCompressedBlock(slot: Slot): Promise<Uint8Array | null> {
|
||||
const slotEra = computeEraNumberFromBlockSlot(slot);
|
||||
const index = this.groups.at(slotEra - this.eraNumber);
|
||||
if (!index) {
|
||||
throw new Error(`Slot ${slot} is out of range`);
|
||||
}
|
||||
if (!index.blocksIndex) {
|
||||
throw new Error(`No block index found for era number ${slotEra}`);
|
||||
}
|
||||
// Calculate offset within the index
|
||||
const indexOffset = slot - index.blocksIndex.startSlot;
|
||||
const offset = index.blocksIndex.recordStart + index.blocksIndex.offsets[indexOffset];
|
||||
if (offset === 0) {
|
||||
return null; // Empty slot
|
||||
}
|
||||
|
||||
const entry = await readEntry(this.fh, offset);
|
||||
if (entry.type !== EntryType.CompressedSignedBeaconBlock) {
|
||||
throw new Error(`Expected CompressedSignedBeaconBlock, got ${EntryType[entry.type] ?? "unknown"}`);
|
||||
}
|
||||
return entry.data;
|
||||
}
|
||||
|
||||
async readSerializedBlock(slot: Slot): Promise<Uint8Array | null> {
|
||||
const compressed = await this.readCompressedBlock(slot);
|
||||
if (compressed === null) return null;
|
||||
return snappyUncompress(compressed);
|
||||
}
|
||||
|
||||
async readBlock(slot: Slot): Promise<SignedBeaconBlock | null> {
|
||||
const serialized = await this.readSerializedBlock(slot);
|
||||
if (serialized === null) return null;
|
||||
return this.config.getForkTypes(slot).SignedBeaconBlock.deserialize(serialized);
|
||||
}
|
||||
|
||||
/**
|
||||
* Validate the era file.
|
||||
* - e2s format correctness
|
||||
* - era range correctness
|
||||
* - network correctness for state and blocks
|
||||
* - block root and signature matches
|
||||
*/
|
||||
async validate(): Promise<void> {
|
||||
for (let groupIndex = 0; groupIndex < this.groups.length; groupIndex++) {
|
||||
const eraNumber = this.eraNumber + groupIndex;
|
||||
const index = this.groups[groupIndex];
|
||||
|
||||
// validate version entry
|
||||
const start = index.blocksIndex
|
||||
? index.blocksIndex.recordStart + index.blocksIndex.offsets[0] - E2STORE_HEADER_SIZE
|
||||
: index.stateIndex.recordStart + index.stateIndex.offsets[0] - E2STORE_HEADER_SIZE;
|
||||
await readVersion(this.fh, start);
|
||||
|
||||
// validate state
|
||||
// the state is loadable and consistent with the given runtime configuration
|
||||
const state = await this.readState(eraNumber);
|
||||
const cachedGenesis = createCachedGenesis(this.config, state.genesisValidatorsRoot);
|
||||
|
||||
if (eraNumber === 0 && index.blocksIndex) {
|
||||
throw new Error("Genesis era (era 0) should not have blocks index");
|
||||
}
|
||||
if (eraNumber !== 0) {
|
||||
if (!index.blocksIndex) {
|
||||
throw new Error(`Era ${eraNumber} is missing blocks index`);
|
||||
}
|
||||
|
||||
// validate blocks
|
||||
for (
|
||||
let slot = index.blocksIndex.startSlot;
|
||||
slot < index.blocksIndex.startSlot + index.blocksIndex.offsets.length;
|
||||
slot++
|
||||
) {
|
||||
const block = await this.readBlock(slot);
|
||||
if (block === null) {
|
||||
if (slot === index.blocksIndex.startSlot) continue; // first slot in the era can't be easily validated
|
||||
if (
|
||||
Buffer.compare(
|
||||
state.blockRoots[(slot - 1) % SLOTS_PER_HISTORICAL_ROOT],
|
||||
state.blockRoots[slot % SLOTS_PER_HISTORICAL_ROOT]
|
||||
) !== 0
|
||||
) {
|
||||
throw new Error(`Block root mismatch at slot ${slot} for empty slot`);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
const blockRoot = this.config.getForkTypes(slot).BeaconBlock.hashTreeRoot(block.message);
|
||||
if (Buffer.compare(blockRoot, state.blockRoots[slot % SLOTS_PER_HISTORICAL_ROOT]) !== 0) {
|
||||
throw new Error(`Block root mismatch at slot ${slot}`);
|
||||
}
|
||||
const msg = ssz.phase0.SigningData.hashTreeRoot({
|
||||
objectRoot: blockRoot,
|
||||
domain: cachedGenesis.getDomain(slot, DOMAIN_BEACON_PROPOSER),
|
||||
});
|
||||
const pk = PublicKey.fromBytes(state.validators[block.message.proposerIndex].pubkey);
|
||||
const sig = Signature.fromBytes(block.signature);
|
||||
if (!verify(msg, pk, sig, true, true)) {
|
||||
throw new Error(`Block signature verification failed at slot ${slot}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
134
packages/era/src/era/util.ts
Normal file
134
packages/era/src/era/util.ts
Normal file
@@ -0,0 +1,134 @@
|
||||
import type {FileHandle} from "node:fs/promises";
|
||||
import {ChainForkConfig} from "@lodestar/config";
|
||||
import {SLOTS_PER_HISTORICAL_ROOT, isForkPostCapella} from "@lodestar/params";
|
||||
import {BeaconState, Slot, capella, ssz} from "@lodestar/types";
|
||||
import {E2STORE_HEADER_SIZE, SlotIndex, readSlotIndex} from "../e2s.ts";
|
||||
import {readUint48} from "../util.ts";
|
||||
|
||||
/**
|
||||
* Parsed components of an .era file name.
|
||||
* Format: <config-name>-<era-number>-<short-historical-root>.era
|
||||
*/
|
||||
export interface EraFileName {
|
||||
/** CONFIG_NAME field of runtime config (mainnet, sepolia, holesky, etc.) */
|
||||
configName: string;
|
||||
/** Number of the first era stored in file, 5-digit zero-padded (00000, 00001, etc.) */
|
||||
eraNumber: number;
|
||||
/** First 4 bytes of last historical root, lower-case hex-encoded (8 chars) */
|
||||
shortHistoricalRoot: string;
|
||||
}
|
||||
|
||||
export interface EraIndices {
|
||||
stateIndex: SlotIndex;
|
||||
blocksIndex?: SlotIndex;
|
||||
}
|
||||
|
||||
/** Return true if `slot` is within the era range */
|
||||
export function isSlotInRange(slot: Slot, eraNumber: number): boolean {
|
||||
return computeEraNumberFromBlockSlot(slot) === eraNumber;
|
||||
}
|
||||
|
||||
export function isValidEraStateSlot(slot: Slot, eraNumber: number): boolean {
|
||||
return slot % SLOTS_PER_HISTORICAL_ROOT === 0 && slot / SLOTS_PER_HISTORICAL_ROOT === eraNumber;
|
||||
}
|
||||
|
||||
export function computeEraNumberFromBlockSlot(slot: Slot): number {
|
||||
return Math.floor(slot / SLOTS_PER_HISTORICAL_ROOT) + 1;
|
||||
}
|
||||
|
||||
export function computeStartBlockSlotFromEraNumber(eraNumber: number): Slot {
|
||||
if (eraNumber === 0) {
|
||||
throw new Error("Genesis era (era 0) does not contain blocks");
|
||||
}
|
||||
return (eraNumber - 1) * SLOTS_PER_HISTORICAL_ROOT;
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse era filename.
|
||||
*
|
||||
* Format: `<config-name>-<era-number>-<short-historical-root>.era`
|
||||
*/
|
||||
export function parseEraName(filename: string): {configName: string; eraNumber: number; shortHistoricalRoot: string} {
|
||||
const match = filename.match(/^(.*)-(\d{5})-([0-9a-f]{8})\.era$/);
|
||||
if (!match) {
|
||||
throw new Error(`Invalid era filename format: ${filename}`);
|
||||
}
|
||||
return {
|
||||
configName: match[1],
|
||||
eraNumber: parseInt(match[2], 10),
|
||||
shortHistoricalRoot: match[3],
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Read all indices from an era file.
|
||||
*/
|
||||
export async function readAllEraIndices(fh: FileHandle): Promise<EraIndices[]> {
|
||||
let end = (await fh.stat()).size;
|
||||
|
||||
const indices: EraIndices[] = [];
|
||||
while (end > E2STORE_HEADER_SIZE) {
|
||||
const index = await readEraIndexes(fh, end);
|
||||
indices.push(index);
|
||||
end = index.blocksIndex
|
||||
? index.blocksIndex.recordStart + index.blocksIndex.offsets[0] - E2STORE_HEADER_SIZE
|
||||
: index.stateIndex.recordStart + index.stateIndex.offsets[0] - E2STORE_HEADER_SIZE;
|
||||
}
|
||||
return indices;
|
||||
}
|
||||
|
||||
/**
|
||||
* Read state and block SlotIndex entries from an era file and validate alignment.
|
||||
*/
|
||||
export async function readEraIndexes(fh: FileHandle, end: number): Promise<EraIndices> {
|
||||
const stateIndex = await readSlotIndex(fh, end);
|
||||
if (stateIndex.offsets.length !== 1) {
|
||||
throw new Error(`State SlotIndex must have exactly one offset, got ${stateIndex.offsets.length}`);
|
||||
}
|
||||
|
||||
// Read block index if not genesis era (era 0)
|
||||
let blocksIndex: SlotIndex | undefined;
|
||||
if (stateIndex.startSlot > 0) {
|
||||
blocksIndex = await readSlotIndex(fh, stateIndex.recordStart);
|
||||
if (blocksIndex.offsets.length !== SLOTS_PER_HISTORICAL_ROOT) {
|
||||
throw new Error(
|
||||
`Block SlotIndex must have exactly ${SLOTS_PER_HISTORICAL_ROOT} offsets, got ${blocksIndex.offsets.length}`
|
||||
);
|
||||
}
|
||||
|
||||
// Validate block and state indices are properly aligned
|
||||
const expectedBlockStartSlot = stateIndex.startSlot - SLOTS_PER_HISTORICAL_ROOT;
|
||||
if (blocksIndex.startSlot !== expectedBlockStartSlot) {
|
||||
throw new Error(
|
||||
`Block index alignment error: expected startSlot=${expectedBlockStartSlot}, ` +
|
||||
`got startSlot=${blocksIndex.startSlot} (should be exactly one era before state)`
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
return {stateIndex, blocksIndex};
|
||||
}
|
||||
|
||||
export function readSlotFromBeaconStateBytes(beaconStateBytes: Uint8Array): Slot {
|
||||
// not technically a Uint48, but for practical purposes fits within 6 bytes
|
||||
return readUint48(
|
||||
beaconStateBytes,
|
||||
// slot is at offset 40: 8 (genesisTime) + 32 (genesisValidatorsRoot)
|
||||
40
|
||||
);
|
||||
}
|
||||
|
||||
export function getShortHistoricalRoot(config: ChainForkConfig, state: BeaconState): string {
|
||||
return Buffer.from(
|
||||
state.slot === 0
|
||||
? state.genesisValidatorsRoot
|
||||
: // Post-Capella, historical_roots is replaced by historical_summaries
|
||||
isForkPostCapella(config.getForkName(state.slot))
|
||||
? ssz.capella.HistoricalSummary.hashTreeRoot(
|
||||
(state as capella.BeaconState).historicalSummaries.at(-1) as capella.BeaconState["historicalSummaries"][0]
|
||||
)
|
||||
: (state.historicalRoots.at(-1) as Uint8Array)
|
||||
)
|
||||
.subarray(0, 4)
|
||||
.toString("hex");
|
||||
}
|
||||
206
packages/era/src/era/writer.ts
Normal file
206
packages/era/src/era/writer.ts
Normal file
@@ -0,0 +1,206 @@
|
||||
import {type FileHandle, open, rename} from "node:fs/promises";
|
||||
import {format, parse} from "node:path";
|
||||
import {ChainForkConfig} from "@lodestar/config";
|
||||
import {SLOTS_PER_HISTORICAL_ROOT} from "@lodestar/params";
|
||||
import {BeaconState, SignedBeaconBlock, Slot} from "@lodestar/types";
|
||||
import {E2STORE_HEADER_SIZE, EntryType, SlotIndex, serializeSlotIndex, writeEntry} from "../e2s.ts";
|
||||
import {snappyCompress} from "../util.ts";
|
||||
import {
|
||||
computeStartBlockSlotFromEraNumber,
|
||||
getShortHistoricalRoot,
|
||||
isSlotInRange,
|
||||
isValidEraStateSlot,
|
||||
} from "./util.ts";
|
||||
|
||||
enum WriterStateType {
|
||||
InitGroup,
|
||||
WriteGroup,
|
||||
FinishedGroup,
|
||||
}
|
||||
|
||||
type WriterState =
|
||||
| {
|
||||
type: WriterStateType.InitGroup;
|
||||
eraNumber: number;
|
||||
currentOffset: number;
|
||||
}
|
||||
| {
|
||||
type: WriterStateType.WriteGroup;
|
||||
eraNumber: number;
|
||||
currentOffset: number;
|
||||
blockOffsets: number[];
|
||||
lastSlot: Slot;
|
||||
}
|
||||
| {
|
||||
type: WriterStateType.FinishedGroup;
|
||||
eraNumber: number;
|
||||
currentOffset: number;
|
||||
shortHistoricalRoot: string;
|
||||
};
|
||||
|
||||
/**
|
||||
* EraWriter is responsible for writing ERA files.
|
||||
*
|
||||
* See https://github.com/eth-clients/e2store-format-specs/blob/main/formats/era.md
|
||||
*/
|
||||
export class EraWriter {
|
||||
config: ChainForkConfig;
|
||||
path: string;
|
||||
fh: FileHandle;
|
||||
eraNumber: number;
|
||||
state: WriterState;
|
||||
|
||||
constructor(config: ChainForkConfig, path: string, fh: FileHandle, eraNumber: number) {
|
||||
this.config = config;
|
||||
this.path = path;
|
||||
this.fh = fh;
|
||||
this.eraNumber = eraNumber;
|
||||
this.state = {
|
||||
type: WriterStateType.InitGroup,
|
||||
eraNumber,
|
||||
currentOffset: 0,
|
||||
};
|
||||
}
|
||||
|
||||
static async create(config: ChainForkConfig, path: string, eraNumber: number): Promise<EraWriter> {
|
||||
const fh = await open(path, "w");
|
||||
return new EraWriter(config, path, fh, eraNumber);
|
||||
}
|
||||
|
||||
async finish(): Promise<string> {
|
||||
if (this.state.type !== WriterStateType.FinishedGroup) {
|
||||
throw new Error("Writer has not been finished");
|
||||
}
|
||||
await this.fh.close();
|
||||
|
||||
const pathParts = parse(this.path);
|
||||
const newPath = format({
|
||||
...pathParts,
|
||||
base: `${this.config.CONFIG_NAME}-${String(this.eraNumber).padStart(5, "0")}-${this.state.shortHistoricalRoot}.era`,
|
||||
});
|
||||
await rename(this.path, newPath);
|
||||
|
||||
return newPath;
|
||||
}
|
||||
|
||||
async writeVersion(): Promise<void> {
|
||||
if (this.state.type === WriterStateType.FinishedGroup) {
|
||||
this.state = {
|
||||
type: WriterStateType.InitGroup,
|
||||
eraNumber: this.state.eraNumber + 1,
|
||||
currentOffset: this.state.currentOffset,
|
||||
};
|
||||
}
|
||||
if (this.state.type !== WriterStateType.InitGroup) {
|
||||
throw new Error("Writer has already been initialized");
|
||||
}
|
||||
await writeEntry(this.fh, this.state.currentOffset, EntryType.Version, new Uint8Array(0));
|
||||
// Move to writing blocks/state
|
||||
this.state = {
|
||||
type: WriterStateType.WriteGroup,
|
||||
eraNumber: this.state.eraNumber,
|
||||
currentOffset: this.state.currentOffset + E2STORE_HEADER_SIZE,
|
||||
blockOffsets: [],
|
||||
lastSlot: computeStartBlockSlotFromEraNumber(this.state.eraNumber) - 1,
|
||||
};
|
||||
}
|
||||
|
||||
async writeCompressedState(slot: Slot, shortHistoricalRoot: string, data: Uint8Array): Promise<void> {
|
||||
if (this.state.type === WriterStateType.InitGroup) {
|
||||
await this.writeVersion();
|
||||
}
|
||||
if (this.state.type !== WriterStateType.WriteGroup) {
|
||||
throw new Error("unreachable");
|
||||
}
|
||||
const expectedSlot = this.state.eraNumber * SLOTS_PER_HISTORICAL_ROOT;
|
||||
if (!isValidEraStateSlot(slot, this.state.eraNumber)) {
|
||||
throw new Error(`State slot must be ${expectedSlot} for era ${this.eraNumber}, got ${slot}`);
|
||||
}
|
||||
for (let s = this.state.lastSlot + 1; s < slot; s++) {
|
||||
this.state.blockOffsets.push(0); // Empty slot
|
||||
}
|
||||
const stateOffset = this.state.currentOffset;
|
||||
await writeEntry(this.fh, this.state.currentOffset, EntryType.CompressedBeaconState, data);
|
||||
this.state.currentOffset += E2STORE_HEADER_SIZE + data.length;
|
||||
|
||||
if (this.state.eraNumber !== 0) {
|
||||
const blocksIndex: SlotIndex = {
|
||||
type: EntryType.SlotIndex,
|
||||
startSlot: computeStartBlockSlotFromEraNumber(this.state.eraNumber),
|
||||
offsets: this.state.blockOffsets.map((o) => o - this.state.currentOffset),
|
||||
recordStart: this.state.currentOffset,
|
||||
};
|
||||
const blocksIndexPayload = serializeSlotIndex(blocksIndex);
|
||||
await writeEntry(this.fh, this.state.currentOffset, EntryType.SlotIndex, blocksIndexPayload);
|
||||
this.state.currentOffset += E2STORE_HEADER_SIZE + blocksIndexPayload.length;
|
||||
}
|
||||
const stateIndex: SlotIndex = {
|
||||
type: EntryType.SlotIndex,
|
||||
startSlot: slot,
|
||||
offsets: [stateOffset - this.state.currentOffset],
|
||||
recordStart: this.state.currentOffset,
|
||||
};
|
||||
const stateIndexPayload = serializeSlotIndex(stateIndex);
|
||||
await writeEntry(this.fh, this.state.currentOffset, EntryType.SlotIndex, stateIndexPayload);
|
||||
this.state.currentOffset += E2STORE_HEADER_SIZE + stateIndexPayload.length;
|
||||
|
||||
this.state = {
|
||||
type: WriterStateType.FinishedGroup,
|
||||
eraNumber: this.state.eraNumber,
|
||||
currentOffset: this.state.currentOffset,
|
||||
shortHistoricalRoot,
|
||||
};
|
||||
}
|
||||
|
||||
async writeSerializedState(slot: Slot, shortHistoricalRoot: string, data: Uint8Array): Promise<void> {
|
||||
const compressed = await snappyCompress(data);
|
||||
await this.writeCompressedState(slot, shortHistoricalRoot, compressed);
|
||||
}
|
||||
|
||||
async writeState(state: BeaconState): Promise<void> {
|
||||
const slot = state.slot;
|
||||
const shortHistoricalRoot = getShortHistoricalRoot(this.config, state);
|
||||
const ssz = this.config.getForkTypes(slot).BeaconState.serialize(state);
|
||||
|
||||
await this.writeSerializedState(slot, shortHistoricalRoot, ssz);
|
||||
}
|
||||
|
||||
async writeCompressedBlock(slot: Slot, data: Uint8Array): Promise<void> {
|
||||
if (this.state.type === WriterStateType.InitGroup) {
|
||||
await this.writeVersion();
|
||||
}
|
||||
if (this.state.type !== WriterStateType.WriteGroup) {
|
||||
throw new Error("Cannot write blocks after writing canonical state");
|
||||
}
|
||||
if (this.eraNumber === 0) {
|
||||
throw new Error("Genesis era (era 0) does not contain blocks");
|
||||
}
|
||||
|
||||
const blockEra = this.state.eraNumber;
|
||||
if (!isSlotInRange(slot, blockEra)) {
|
||||
throw new Error(`Slot ${slot} is not in valid block range for era ${blockEra}`);
|
||||
}
|
||||
if (slot <= this.state.lastSlot) {
|
||||
throw new Error(`Slots must be written in ascending order. Last slot: ${this.state.lastSlot}, got: ${slot}`);
|
||||
}
|
||||
for (let s = this.state.lastSlot + 1; s < slot; s++) {
|
||||
this.state.blockOffsets.push(0); // Empty slot
|
||||
}
|
||||
await writeEntry(this.fh, this.state.currentOffset, EntryType.CompressedSignedBeaconBlock, data);
|
||||
this.state.blockOffsets.push(this.state.currentOffset);
|
||||
this.state.currentOffset += E2STORE_HEADER_SIZE + data.length;
|
||||
this.state.lastSlot = slot;
|
||||
}
|
||||
|
||||
async writeSerializedBlock(slot: Slot, data: Uint8Array): Promise<void> {
|
||||
const compressed = await snappyCompress(data);
|
||||
await this.writeCompressedBlock(slot, compressed);
|
||||
}
|
||||
|
||||
async writeBlock(block: SignedBeaconBlock): Promise<void> {
|
||||
const slot = block.message.slot;
|
||||
const types = this.config.getForkTypes(slot);
|
||||
const ssz = types.SignedBeaconBlock.serialize(block);
|
||||
await this.writeSerializedBlock(slot, ssz);
|
||||
}
|
||||
}
|
||||
2
packages/era/src/index.ts
Normal file
2
packages/era/src/index.ts
Normal file
@@ -0,0 +1,2 @@
|
||||
export * as e2s from "./e2s.js";
|
||||
export * as era from "./era/index.js";
|
||||
60
packages/era/src/util.ts
Normal file
60
packages/era/src/util.ts
Normal file
@@ -0,0 +1,60 @@
|
||||
import {Uint8ArrayList} from "uint8arraylist";
|
||||
import {SnappyFramesUncompress, encodeSnappy} from "@lodestar/reqresp/utils";
|
||||
|
||||
/** Read 48-bit signed integer (little-endian) at offset. */
|
||||
export function readInt48(bytes: Uint8Array, offset: number): number {
|
||||
return Buffer.prototype.readIntLE.call(bytes, offset, 6);
|
||||
}
|
||||
|
||||
/** Read 48-bit unsigned integer (little-endian) at offset. */
|
||||
export function readUint48(bytes: Uint8Array, offset: number): number {
|
||||
return Buffer.prototype.readUintLE.call(bytes, offset, 6);
|
||||
}
|
||||
|
||||
/** Read 16-bit unsigned integer (little-endian) at offset. */
|
||||
export function readUint16(bytes: Uint8Array, offset: number): number {
|
||||
return Buffer.prototype.readUint16LE.call(bytes, offset);
|
||||
}
|
||||
|
||||
/** Read 32-bit unsigned integer (little-endian) at offset. */
|
||||
export function readUint32(bytes: Uint8Array, offset: number): number {
|
||||
return Buffer.prototype.readUint32LE.call(bytes, offset);
|
||||
}
|
||||
|
||||
/** Write 48-bit signed integer (little-endian) into target at offset. */
|
||||
export function writeInt48(target: Uint8Array, offset: number, v: number): void {
|
||||
Buffer.prototype.writeIntLE.call(target, v, offset, 6);
|
||||
}
|
||||
|
||||
/** Write 16-bit unsigned integer (little-endian) into target at offset. */
|
||||
export function writeUint16(target: Uint8Array, offset: number, v: number): void {
|
||||
Buffer.prototype.writeUint16LE.call(target, v, offset);
|
||||
}
|
||||
|
||||
/** Write 32-bit unsigned integer (little-endian) into target at offset. */
|
||||
export function writeUint32(target: Uint8Array, offset: number, v: number): void {
|
||||
Buffer.prototype.writeUint32LE.call(target, v, offset);
|
||||
}
|
||||
|
||||
/** Decompress snappy-framed data */
|
||||
export function snappyUncompress(compressedData: Uint8Array): Uint8Array {
|
||||
const decompressor = new SnappyFramesUncompress();
|
||||
|
||||
const input = new Uint8ArrayList(compressedData);
|
||||
const result = decompressor.uncompress(input);
|
||||
|
||||
if (result === null) {
|
||||
throw new Error("Snappy decompression failed - no data returned");
|
||||
}
|
||||
|
||||
return result.subarray();
|
||||
}
|
||||
|
||||
/** Compress data using snappy framing */
|
||||
export async function snappyCompress(data: Uint8Array): Promise<Uint8Array> {
|
||||
const buffers: Buffer[] = [];
|
||||
for await (const chunk of encodeSnappy(Buffer.from(data.buffer, data.byteOffset, data.byteLength))) {
|
||||
buffers.push(chunk);
|
||||
}
|
||||
return Buffer.concat(buffers);
|
||||
}
|
||||
@@ -0,0 +1,55 @@
|
||||
import {existsSync, mkdirSync} from "node:fs";
|
||||
import path, {basename} from "node:path";
|
||||
import {fileURLToPath} from "node:url";
|
||||
import {beforeAll, describe, expect, it} from "vitest";
|
||||
import {ChainForkConfig, createChainForkConfig} from "@lodestar/config";
|
||||
import {mainnetChainConfig} from "@lodestar/config/networks";
|
||||
import {SLOTS_PER_HISTORICAL_ROOT} from "@lodestar/params";
|
||||
import {EraReader, EraWriter} from "../../src/era/index.ts";
|
||||
|
||||
const __filename = fileURLToPath(import.meta.url);
|
||||
const __dirname = path.dirname(__filename);
|
||||
|
||||
describe.runIf(!process.env.CI)("read original era and re-write our own era file", () => {
|
||||
let config: ChainForkConfig;
|
||||
const eraPath = path.resolve(__dirname, "../mainnet-01506-4781865b.era");
|
||||
const expectedEra = 1506;
|
||||
|
||||
beforeAll(() => {
|
||||
config = createChainForkConfig(mainnetChainConfig);
|
||||
});
|
||||
|
||||
it("validate an existing era file, rewrite into a new era file, and validate that new era file", async () => {
|
||||
const SPR = SLOTS_PER_HISTORICAL_ROOT;
|
||||
const stateSlot = expectedEra * SPR;
|
||||
|
||||
const reader = await EraReader.open(config, eraPath);
|
||||
|
||||
await reader.validate();
|
||||
const outDir = path.resolve(__dirname, "../out");
|
||||
if (!existsSync(outDir)) mkdirSync(outDir, {recursive: true});
|
||||
let outFile = path.resolve(outDir, `mainnet-${String(expectedEra).padStart(5, "0")}-deadbeef.era`);
|
||||
|
||||
const writer = await EraWriter.create(config, outFile, expectedEra);
|
||||
const blocksIndex = reader.groups[0].blocksIndex;
|
||||
if (!blocksIndex) {
|
||||
throw new Error("Original era file missing blocks index");
|
||||
}
|
||||
for (let slot = blocksIndex.startSlot; slot < blocksIndex.startSlot + blocksIndex.offsets.length; slot++) {
|
||||
const block = await reader.readBlock(slot);
|
||||
if (block === null) continue;
|
||||
await writer.writeBlock(block);
|
||||
}
|
||||
const originalState = await reader.readState();
|
||||
expect(originalState.slot).to.equal(stateSlot);
|
||||
await writer.writeState(originalState);
|
||||
await reader.close();
|
||||
outFile = await writer.finish();
|
||||
|
||||
expect(basename(outFile)).to.equal(basename(eraPath));
|
||||
const newReader = await EraReader.open(config, outFile);
|
||||
await newReader.validate();
|
||||
|
||||
await newReader.close();
|
||||
}, 1000000);
|
||||
});
|
||||
45
packages/era/test/era_downloader.sh
Executable file
45
packages/era/test/era_downloader.sh
Executable file
@@ -0,0 +1,45 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Copyright (c) 2025 Status Research & Development GmbH. Licensed under
|
||||
# either of:
|
||||
# - Apache License, version 2.0
|
||||
# - MIT license
|
||||
# at your option. This file may not be copied, modified, or distributed except
|
||||
# according to those terms.
|
||||
|
||||
# Usage:
|
||||
# - chmod +x era_downloader.sh
|
||||
# - ./era_downloader.sh # downloads mainnet-01506-4781865b.era into this test directory
|
||||
# - ./era_downloader.sh <file_url> # downloads the provided file into this test directory
|
||||
set -eo pipefail
|
||||
|
||||
SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
|
||||
DOWNLOAD_DIR="$SCRIPT_DIR"
|
||||
|
||||
if [ $# -eq 0 ]; then
|
||||
DOWNLOAD_URL="https://mainnet.era.nimbus.team/mainnet-01506-4781865b.era"
|
||||
elif [ $# -eq 1 ]; then
|
||||
DOWNLOAD_URL="$1"
|
||||
else
|
||||
echo "Usage: $0 [file_url]"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
if ! command -v aria2c > /dev/null 2>&1; then
|
||||
echo "❌ aria2c is not installed. Install via: brew install aria2 (macOS) or sudo apt install aria2 (Linux)"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
mkdir -p "$DOWNLOAD_DIR"
|
||||
|
||||
FILE_NAME=$(basename "$DOWNLOAD_URL")
|
||||
|
||||
echo "📥 Downloading $FILE_NAME to $DOWNLOAD_DIR ..."
|
||||
aria2c -x 8 -c -o "$FILE_NAME" \
|
||||
--dir="$DOWNLOAD_DIR" \
|
||||
--console-log-level=warn \
|
||||
--quiet=true \
|
||||
--summary-interval=0 \
|
||||
"$DOWNLOAD_URL"
|
||||
|
||||
echo "✅ Downloaded: $DOWNLOAD_DIR/$FILE_NAME"
|
||||
52
packages/era/test/unit/era.unit.test.ts
Normal file
52
packages/era/test/unit/era.unit.test.ts
Normal file
@@ -0,0 +1,52 @@
|
||||
import {assert, describe, it} from "vitest";
|
||||
import {E2STORE_HEADER_SIZE, EntryType, parseEntryHeader} from "../../src/e2s.ts";
|
||||
|
||||
function header(type: EntryType, dataLen: number): Uint8Array {
|
||||
const h = new Uint8Array(8);
|
||||
h[0] = type;
|
||||
h[1] = type >> 8;
|
||||
// 4-byte LE length
|
||||
h[2] = dataLen & 0xff;
|
||||
h[3] = (dataLen >> 8) & 0xff;
|
||||
h[4] = (dataLen >> 16) & 0xff;
|
||||
h[5] = (dataLen >> 24) & 0xff;
|
||||
// reserved = 0x0000
|
||||
// h[6] = 0x00;
|
||||
// h[7] = 0x00;
|
||||
return h;
|
||||
}
|
||||
|
||||
describe("e2Store utilities (unit)", () => {
|
||||
it("should read the type and data correctly", () => {
|
||||
const payload = new Uint8Array([0x01, 0x02, 0x03, 0x04]);
|
||||
const ver = header(EntryType.Version, 0);
|
||||
const bytes = new Uint8Array([...ver, ...header(EntryType.Empty, payload.length), ...payload]);
|
||||
|
||||
// Read the second entry (Empty with payload)
|
||||
const entry = parseEntryHeader(bytes.slice(E2STORE_HEADER_SIZE));
|
||||
assert.equal(entry.type, EntryType.Empty);
|
||||
assert.deepEqual(entry.length, payload.length);
|
||||
});
|
||||
|
||||
it("should iterate and read multiple entries ", () => {
|
||||
const firstPayload = new Uint8Array([0x01, 0x02, 0x03, 0x04]);
|
||||
const ver = header(EntryType.Version, 0);
|
||||
const first = new Uint8Array([...header(EntryType.Empty, firstPayload.length), ...firstPayload]);
|
||||
const second = header(EntryType.Empty, 0);
|
||||
const bytes = new Uint8Array([...ver, ...first, ...second]);
|
||||
|
||||
const entries: Array<ReturnType<typeof parseEntryHeader>> = [];
|
||||
let p = 0;
|
||||
while (p + E2STORE_HEADER_SIZE <= bytes.length) {
|
||||
const e = parseEntryHeader(bytes.slice(p));
|
||||
entries.push(e);
|
||||
p += E2STORE_HEADER_SIZE + e.length;
|
||||
}
|
||||
|
||||
assert.equal(entries.length, 3);
|
||||
assert.equal(entries[0].type, EntryType.Version);
|
||||
assert.equal(entries[0].length, 0);
|
||||
assert.equal(entries[1].type, EntryType.Empty);
|
||||
assert.equal(entries[2].type, EntryType.Empty);
|
||||
});
|
||||
});
|
||||
7
packages/era/tsconfig.build.json
Normal file
7
packages/era/tsconfig.build.json
Normal file
@@ -0,0 +1,7 @@
|
||||
{
|
||||
"extends": "../../tsconfig.build.json",
|
||||
"include": ["src"],
|
||||
"compilerOptions": {
|
||||
"outDir": "lib"
|
||||
}
|
||||
}
|
||||
7
packages/era/tsconfig.json
Normal file
7
packages/era/tsconfig.json
Normal file
@@ -0,0 +1,7 @@
|
||||
{
|
||||
"extends": "../../tsconfig.json",
|
||||
"include": ["src", "test"],
|
||||
"compilerOptions": {
|
||||
"outDir": "lib"
|
||||
}
|
||||
}
|
||||
@@ -27,8 +27,8 @@
|
||||
},
|
||||
"imports": {
|
||||
"#snappy": {
|
||||
"bun": "./src/encodingStrategies/sszSnappy/snappyFrames/snappy_bun.ts",
|
||||
"default": "./lib/encodingStrategies/sszSnappy/snappyFrames/snappy.js"
|
||||
"bun": "./src/utils/snappy_bun.ts",
|
||||
"default": "./lib/utils/snappy.js"
|
||||
}
|
||||
},
|
||||
"files": [
|
||||
|
||||
@@ -2,8 +2,8 @@ import {decode as varintDecode, encodingLength as varintEncodingLength} from "ui
|
||||
import {Uint8ArrayList} from "uint8arraylist";
|
||||
import {TypeSizes} from "../../types.js";
|
||||
import {BufferedSource} from "../../utils/index.js";
|
||||
import {SnappyFramesUncompress} from "../../utils/snappyIndex.js";
|
||||
import {SszSnappyError, SszSnappyErrorCode} from "./errors.js";
|
||||
import {SnappyFramesUncompress} from "./snappyFrames/uncompress.js";
|
||||
import {maxEncodedLen} from "./utils.js";
|
||||
|
||||
export const MAX_VARINT_BYTES = 10;
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import {encode as varintEncode} from "uint8-varint";
|
||||
import {encodeSnappy} from "./snappyFrames/compress.js";
|
||||
import {encodeSnappy} from "../../utils/snappyIndex.js";
|
||||
|
||||
/**
|
||||
* ssz_snappy encoding strategy writer.
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
export {SnappyFramesUncompress, encodeSnappy} from "../../utils/snappyIndex.js";
|
||||
export * from "./decode.js";
|
||||
export * from "./encode.js";
|
||||
export * from "./errors.js";
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
import {decode as varintDecode, encodingLength as varintEncodingLength} from "uint8-varint";
|
||||
import {Uint8ArrayList} from "uint8arraylist";
|
||||
import {writeSszSnappyPayload} from "../encodingStrategies/sszSnappy/encode.js";
|
||||
import {SnappyFramesUncompress} from "../encodingStrategies/sszSnappy/snappyFrames/uncompress.js";
|
||||
import {Encoding} from "../types.js";
|
||||
import {SnappyFramesUncompress} from "./snappyIndex.js";
|
||||
|
||||
// ErrorMessage schema:
|
||||
//
|
||||
|
||||
@@ -6,3 +6,4 @@ export * from "./errorMessage.js";
|
||||
export * from "./onChunk.js";
|
||||
export * from "./peerId.js";
|
||||
export * from "./protocolId.js";
|
||||
export * from "./snappyIndex.js";
|
||||
|
||||
@@ -1,8 +1,7 @@
|
||||
import {compress} from "#snappy";
|
||||
import {ChunkType, IDENTIFIER_FRAME, UNCOMPRESSED_CHUNK_SIZE, crc} from "./common.js";
|
||||
import {ChunkType, IDENTIFIER_FRAME, UNCOMPRESSED_CHUNK_SIZE, crc} from "./snappyCommon.js";
|
||||
|
||||
// The logic in this file is largely copied (in simplified form) from https://github.com/ChainSafe/node-snappy-stream/
|
||||
|
||||
export async function* encodeSnappy(bytes: Buffer): AsyncGenerator<Buffer> {
|
||||
yield IDENTIFIER_FRAME;
|
||||
|
||||
3
packages/reqresp/src/utils/snappyIndex.ts
Normal file
3
packages/reqresp/src/utils/snappyIndex.ts
Normal file
@@ -0,0 +1,3 @@
|
||||
export * from "./snappyCommon.js";
|
||||
export {encodeSnappy} from "./snappyCompress.js";
|
||||
export {SnappyFramesUncompress} from "./snappyUncompress.js";
|
||||
@@ -1,6 +1,6 @@
|
||||
import {Uint8ArrayList} from "uint8arraylist";
|
||||
import {uncompress} from "#snappy";
|
||||
import {ChunkType, IDENTIFIER, UNCOMPRESSED_CHUNK_SIZE, crc} from "./common.js";
|
||||
import {ChunkType, IDENTIFIER, UNCOMPRESSED_CHUNK_SIZE, crc} from "./snappyCommon.js";
|
||||
|
||||
export class SnappyFramesUncompress {
|
||||
private buffer = new Uint8ArrayList();
|
||||
@@ -1,9 +1,13 @@
|
||||
import {pipe} from "it-pipe";
|
||||
import {Uint8ArrayList} from "uint8arraylist";
|
||||
import {describe, expect, it} from "vitest";
|
||||
import {ChunkType, IDENTIFIER_FRAME, crc} from "../../../../../src/encodingStrategies/sszSnappy/snappyFrames/common.js";
|
||||
import {encodeSnappy} from "../../../../../src/encodingStrategies/sszSnappy/snappyFrames/compress.js";
|
||||
import {SnappyFramesUncompress} from "../../../../../src/encodingStrategies/sszSnappy/snappyFrames/uncompress.js";
|
||||
import {
|
||||
ChunkType,
|
||||
IDENTIFIER_FRAME,
|
||||
SnappyFramesUncompress,
|
||||
crc,
|
||||
encodeSnappy,
|
||||
} from "../../../../../src/utils/snappyIndex.js";
|
||||
|
||||
describe("encodingStrategies / sszSnappy / snappy frames / uncompress", () => {
|
||||
it("should work with short input", () =>
|
||||
|
||||
Reference in New Issue
Block a user