mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-11 00:08:13 -05:00
feat(era-file): back to era file support (#19482)
This commit is contained in:
@@ -9,7 +9,7 @@ use reth_db_api::{
|
||||
RawKey, RawTable, RawValue,
|
||||
};
|
||||
use reth_era::{
|
||||
common::{decode::DecodeCompressed, file_ops::StreamReader},
|
||||
common::{decode::DecodeCompressedRlp, file_ops::StreamReader},
|
||||
e2s::error::E2sError,
|
||||
era1::{
|
||||
file::{BlockTupleIterator, Era1Reader},
|
||||
|
||||
@@ -5,7 +5,7 @@ use alloy_rlp::Decodable;
|
||||
use ssz::Decode;
|
||||
|
||||
/// Extension trait for generic decoding from compressed data
|
||||
pub trait DecodeCompressed {
|
||||
pub trait DecodeCompressedRlp {
|
||||
/// Decompress and decode the data into the given type
|
||||
fn decode<T: Decodable>(&self) -> Result<T, E2sError>;
|
||||
}
|
||||
|
||||
342
crates/era/src/era/file.rs
Normal file
342
crates/era/src/era/file.rs
Normal file
@@ -0,0 +1,342 @@
|
||||
//! Represents a complete Era file
|
||||
//!
|
||||
//! The structure of an Era file follows the specification:
|
||||
//! `Version | block* | era-state | other-entries* | slot-index(block)? | slot-index(state)`
|
||||
//!
|
||||
//! See also <https://github.com/eth-clients/e2store-format-specs/blob/main/formats/era.md>.
|
||||
|
||||
use crate::{
|
||||
common::file_ops::{EraFileFormat, FileReader, StreamReader, StreamWriter},
|
||||
e2s::{
|
||||
error::E2sError,
|
||||
file::{E2StoreReader, E2StoreWriter},
|
||||
types::{Entry, IndexEntry, Version, SLOT_INDEX},
|
||||
},
|
||||
era::types::{
|
||||
consensus::{
|
||||
CompressedBeaconState, CompressedSignedBeaconBlock, COMPRESSED_BEACON_STATE,
|
||||
COMPRESSED_SIGNED_BEACON_BLOCK,
|
||||
},
|
||||
group::{EraGroup, EraId, SlotIndex},
|
||||
},
|
||||
};
|
||||
use std::{
|
||||
fs::File,
|
||||
io::{Read, Seek, Write},
|
||||
};
|
||||
|
||||
/// Era file interface
|
||||
#[derive(Debug)]
|
||||
pub struct EraFile {
|
||||
/// Version record, must be the first record in the file
|
||||
pub version: Version,
|
||||
|
||||
/// Main content group of the Era file
|
||||
pub group: EraGroup,
|
||||
|
||||
/// File identifier
|
||||
pub id: EraId,
|
||||
}
|
||||
|
||||
impl EraFileFormat for EraFile {
|
||||
type EraGroup = EraGroup;
|
||||
type Id = EraId;
|
||||
|
||||
/// Create a new [`EraFile`]
|
||||
fn new(group: EraGroup, id: EraId) -> Self {
|
||||
Self { version: Version, group, id }
|
||||
}
|
||||
|
||||
fn version(&self) -> &Version {
|
||||
&self.version
|
||||
}
|
||||
|
||||
fn group(&self) -> &Self::EraGroup {
|
||||
&self.group
|
||||
}
|
||||
|
||||
fn id(&self) -> &Self::Id {
|
||||
&self.id
|
||||
}
|
||||
}
|
||||
|
||||
/// Reader for era files that builds on top of [`E2StoreReader`]
|
||||
#[derive(Debug)]
|
||||
pub struct EraReader<R: Read> {
|
||||
reader: E2StoreReader<R>,
|
||||
}
|
||||
|
||||
/// An iterator of [`BeaconBlockIterator`] streaming from [`E2StoreReader`].
|
||||
#[derive(Debug)]
|
||||
pub struct BeaconBlockIterator<R: Read> {
|
||||
reader: E2StoreReader<R>,
|
||||
state: Option<CompressedBeaconState>,
|
||||
other_entries: Vec<Entry>,
|
||||
block_slot_index: Option<SlotIndex>,
|
||||
state_slot_index: Option<SlotIndex>,
|
||||
}
|
||||
|
||||
impl<R: Read> BeaconBlockIterator<R> {
|
||||
fn new(reader: E2StoreReader<R>) -> Self {
|
||||
Self {
|
||||
reader,
|
||||
state: None,
|
||||
other_entries: Default::default(),
|
||||
block_slot_index: None,
|
||||
state_slot_index: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<R: Read + Seek> Iterator for BeaconBlockIterator<R> {
|
||||
type Item = Result<CompressedSignedBeaconBlock, E2sError>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
self.next_result().transpose()
|
||||
}
|
||||
}
|
||||
|
||||
impl<R: Read + Seek> BeaconBlockIterator<R> {
|
||||
fn next_result(&mut self) -> Result<Option<CompressedSignedBeaconBlock>, E2sError> {
|
||||
loop {
|
||||
let Some(entry) = self.reader.read_next_entry()? else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
match entry.entry_type {
|
||||
COMPRESSED_SIGNED_BEACON_BLOCK => {
|
||||
let block = CompressedSignedBeaconBlock::from_entry(&entry)?;
|
||||
return Ok(Some(block));
|
||||
}
|
||||
COMPRESSED_BEACON_STATE => {
|
||||
if self.state.is_some() {
|
||||
return Err(E2sError::Ssz("Multiple state entries found".to_string()));
|
||||
}
|
||||
self.state = Some(CompressedBeaconState::from_entry(&entry)?);
|
||||
}
|
||||
SLOT_INDEX => {
|
||||
let slot_index = SlotIndex::from_entry(&entry)?;
|
||||
// if we haven't seen the state yet, the slot index is for blocks,
|
||||
// if we have seen the state, the slot index is for the state
|
||||
if self.state.is_none() {
|
||||
self.block_slot_index = Some(slot_index);
|
||||
} else {
|
||||
self.state_slot_index = Some(slot_index);
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
self.other_entries.push(entry);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<R: Read + Seek> StreamReader<R> for EraReader<R> {
|
||||
type File = EraFile;
|
||||
type Iterator = BeaconBlockIterator<R>;
|
||||
|
||||
/// Create a new [`EraReader`]
|
||||
fn new(reader: R) -> Self {
|
||||
Self { reader: E2StoreReader::new(reader) }
|
||||
}
|
||||
|
||||
/// Returns an iterator of [`BeaconBlockIterator`] streaming from `reader`.
|
||||
fn iter(self) -> BeaconBlockIterator<R> {
|
||||
BeaconBlockIterator::new(self.reader)
|
||||
}
|
||||
|
||||
fn read(self, network_name: String) -> Result<Self::File, E2sError> {
|
||||
self.read_and_assemble(network_name)
|
||||
}
|
||||
}
|
||||
|
||||
impl<R: Read + Seek> EraReader<R> {
|
||||
/// Reads and parses an era file from the underlying reader, assembling all components
|
||||
/// into a complete [`EraFile`] with an [`EraId`] that includes the provided network name.
|
||||
pub fn read_and_assemble(mut self, network_name: String) -> Result<EraFile, E2sError> {
|
||||
// Validate version entry
|
||||
let _version_entry = match self.reader.read_version()? {
|
||||
Some(entry) if entry.is_version() => entry,
|
||||
Some(_) => return Err(E2sError::Ssz("First entry is not a Version entry".to_string())),
|
||||
None => return Err(E2sError::Ssz("Empty Era file".to_string())),
|
||||
};
|
||||
|
||||
let mut iter = self.iter();
|
||||
let blocks = (&mut iter).collect::<Result<Vec<_>, _>>()?;
|
||||
|
||||
let BeaconBlockIterator {
|
||||
state, other_entries, block_slot_index, state_slot_index, ..
|
||||
} = iter;
|
||||
|
||||
let state =
|
||||
state.ok_or_else(|| E2sError::Ssz("Era file missing state entry".to_string()))?;
|
||||
|
||||
let state_slot_index = state_slot_index
|
||||
.ok_or_else(|| E2sError::Ssz("Era file missing state slot index".to_string()))?;
|
||||
|
||||
// Create appropriate `EraGroup`, genesis vs non-genesis
|
||||
let mut group = if let Some(block_index) = block_slot_index {
|
||||
EraGroup::with_block_index(blocks, state, block_index, state_slot_index)
|
||||
} else {
|
||||
EraGroup::new(blocks, state, state_slot_index)
|
||||
};
|
||||
|
||||
// Add other entries
|
||||
for entry in other_entries {
|
||||
group.add_entry(entry);
|
||||
}
|
||||
|
||||
let (start_slot, slot_count) = group.slot_range();
|
||||
|
||||
let id = EraId::new(network_name, start_slot, slot_count);
|
||||
|
||||
Ok(EraFile::new(group, id))
|
||||
}
|
||||
}
|
||||
|
||||
impl FileReader for EraReader<File> {}
|
||||
|
||||
/// Writer for Era files that builds on top of [`E2StoreWriter`]
|
||||
#[derive(Debug)]
|
||||
pub struct EraWriter<W: Write> {
|
||||
writer: E2StoreWriter<W>,
|
||||
has_written_version: bool,
|
||||
has_written_state: bool,
|
||||
has_written_block_slot_index: bool,
|
||||
has_written_state_slot_index: bool,
|
||||
}
|
||||
|
||||
impl<W: Write> StreamWriter<W> for EraWriter<W> {
|
||||
type File = EraFile;
|
||||
|
||||
/// Create a new [`EraWriter`]
|
||||
fn new(writer: W) -> Self {
|
||||
Self {
|
||||
writer: E2StoreWriter::new(writer),
|
||||
has_written_version: false,
|
||||
has_written_state: false,
|
||||
has_written_block_slot_index: false,
|
||||
has_written_state_slot_index: false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Write the version entry
|
||||
fn write_version(&mut self) -> Result<(), E2sError> {
|
||||
if self.has_written_version {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
self.writer.write_version()?;
|
||||
self.has_written_version = true;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn write_file(&mut self, file: &Self::File) -> Result<(), E2sError> {
|
||||
// Write version
|
||||
self.write_version()?;
|
||||
|
||||
// Write all blocks
|
||||
for block in &file.group.blocks {
|
||||
self.write_beacon_block(block)?;
|
||||
}
|
||||
|
||||
// Write state
|
||||
self.write_beacon_state(&file.group.era_state)?;
|
||||
|
||||
// Write other entries
|
||||
for entry in &file.group.other_entries {
|
||||
self.writer.write_entry(entry)?;
|
||||
}
|
||||
|
||||
// Write slot index
|
||||
if let Some(ref block_index) = file.group.slot_index {
|
||||
self.write_block_slot_index(block_index)?;
|
||||
}
|
||||
|
||||
// Write state index
|
||||
self.write_state_slot_index(&file.group.state_slot_index)?;
|
||||
|
||||
self.writer.flush()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Flush any buffered data to the underlying writer
|
||||
fn flush(&mut self) -> Result<(), E2sError> {
|
||||
self.writer.flush()
|
||||
}
|
||||
}
|
||||
|
||||
impl<W: Write> EraWriter<W> {
|
||||
/// Write beacon block
|
||||
pub fn write_beacon_block(
|
||||
&mut self,
|
||||
block: &CompressedSignedBeaconBlock,
|
||||
) -> Result<(), E2sError> {
|
||||
self.ensure_version_written()?;
|
||||
|
||||
// Ensure blocks are written before state/indices
|
||||
if self.has_written_state ||
|
||||
self.has_written_block_slot_index ||
|
||||
self.has_written_state_slot_index
|
||||
{
|
||||
return Err(E2sError::Ssz("Cannot write blocks after state or indices".to_string()));
|
||||
}
|
||||
|
||||
let entry = block.to_entry();
|
||||
self.writer.write_entry(&entry)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Write beacon state
|
||||
fn write_beacon_state(&mut self, state: &CompressedBeaconState) -> Result<(), E2sError> {
|
||||
self.ensure_version_written()?;
|
||||
|
||||
if self.has_written_state {
|
||||
return Err(E2sError::Ssz("State already written".to_string()));
|
||||
}
|
||||
|
||||
let entry = state.to_entry();
|
||||
self.writer.write_entry(&entry)?;
|
||||
self.has_written_state = true;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Write the block slot index
|
||||
pub fn write_block_slot_index(&mut self, slot_index: &SlotIndex) -> Result<(), E2sError> {
|
||||
self.ensure_version_written()?;
|
||||
|
||||
if self.has_written_block_slot_index {
|
||||
return Err(E2sError::Ssz("Block slot index already written".to_string()));
|
||||
}
|
||||
|
||||
let entry = slot_index.to_entry();
|
||||
self.writer.write_entry(&entry)?;
|
||||
self.has_written_block_slot_index = true;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Write the state slot index
|
||||
pub fn write_state_slot_index(&mut self, slot_index: &SlotIndex) -> Result<(), E2sError> {
|
||||
self.ensure_version_written()?;
|
||||
|
||||
if self.has_written_state_slot_index {
|
||||
return Err(E2sError::Ssz("State slot index already written".to_string()));
|
||||
}
|
||||
|
||||
let entry = slot_index.to_entry();
|
||||
self.writer.write_entry(&entry)?;
|
||||
self.has_written_state_slot_index = true;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Helper to ensure version is written before any data
|
||||
fn ensure_version_written(&mut self) -> Result<(), E2sError> {
|
||||
if !self.has_written_version {
|
||||
self.write_version()?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -1,3 +1,4 @@
|
||||
//! Core era primitives.
|
||||
|
||||
pub mod file;
|
||||
pub mod types;
|
||||
|
||||
@@ -3,10 +3,14 @@
|
||||
//! See also <https://github.com/eth-clients/e2store-format-specs/blob/main/formats/era.md>
|
||||
|
||||
use crate::{
|
||||
common::file_ops::EraFileId,
|
||||
e2s::types::{Entry, IndexEntry, SLOT_INDEX},
|
||||
era::types::consensus::{CompressedBeaconState, CompressedSignedBeaconBlock},
|
||||
};
|
||||
|
||||
/// Number of slots per historical root in ERA files
|
||||
pub const SLOTS_PER_HISTORICAL_ROOT: u64 = 8192;
|
||||
|
||||
/// Era file content group
|
||||
///
|
||||
/// Format: `Version | block* | era-state | other-entries* | slot-index(block)? | slot-index(state)`
|
||||
@@ -64,6 +68,28 @@ impl EraGroup {
|
||||
pub fn add_entry(&mut self, entry: Entry) {
|
||||
self.other_entries.push(entry);
|
||||
}
|
||||
|
||||
/// Get the starting slot and slot count.
|
||||
pub const fn slot_range(&self) -> (u64, u32) {
|
||||
if let Some(ref block_index) = self.slot_index {
|
||||
// Non-genesis era: use block slot index
|
||||
(block_index.starting_slot, block_index.slot_count() as u32)
|
||||
} else {
|
||||
// Genesis era: use state slot index, it should be slot 0
|
||||
// Genesis has only the genesis state, no blocks
|
||||
(self.state_slot_index.starting_slot, 0)
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the starting slot number
|
||||
pub const fn starting_slot(&self) -> u64 {
|
||||
self.slot_range().0
|
||||
}
|
||||
|
||||
/// Get the number of slots
|
||||
pub const fn slot_count(&self) -> u32 {
|
||||
self.slot_range().1
|
||||
}
|
||||
}
|
||||
|
||||
/// [`SlotIndex`] records store offsets to data at specific slots
|
||||
@@ -122,6 +148,93 @@ impl IndexEntry for SlotIndex {
|
||||
}
|
||||
}
|
||||
|
||||
/// Era file identifier
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct EraId {
|
||||
/// Network configuration name
|
||||
pub network_name: String,
|
||||
|
||||
/// First slot number in file
|
||||
pub start_slot: u64,
|
||||
|
||||
/// Number of slots in the file
|
||||
pub slot_count: u32,
|
||||
|
||||
/// Optional hash identifier for this file
|
||||
/// First 4 bytes of the last historical root in the last state in the era file
|
||||
pub hash: Option<[u8; 4]>,
|
||||
}
|
||||
|
||||
impl EraId {
|
||||
/// Create a new [`EraId`]
|
||||
pub fn new(network_name: impl Into<String>, start_slot: u64, slot_count: u32) -> Self {
|
||||
Self { network_name: network_name.into(), start_slot, slot_count, hash: None }
|
||||
}
|
||||
|
||||
/// Add a hash identifier to [`EraId`]
|
||||
pub const fn with_hash(mut self, hash: [u8; 4]) -> Self {
|
||||
self.hash = Some(hash);
|
||||
self
|
||||
}
|
||||
|
||||
/// Calculate which era number the file starts at
|
||||
pub const fn era_number(&self) -> u64 {
|
||||
self.start_slot / SLOTS_PER_HISTORICAL_ROOT
|
||||
}
|
||||
|
||||
// Helper function to calculate the number of eras per era1 file,
|
||||
// If the user can decide how many blocks per era1 file there are, we need to calculate it.
|
||||
// Most of the time it should be 1, but it can never be more than 2 eras per file
|
||||
// as there is a maximum of 8192 blocks per era1 file.
|
||||
const fn calculate_era_count(&self) -> u64 {
|
||||
if self.slot_count == 0 {
|
||||
return 0;
|
||||
}
|
||||
|
||||
let first_era = self.era_number();
|
||||
|
||||
// Calculate the actual last slot number in the range
|
||||
let last_slot = self.start_slot + self.slot_count as u64 - 1;
|
||||
// Find which era the last block belongs to
|
||||
let last_era = last_slot / SLOTS_PER_HISTORICAL_ROOT;
|
||||
// Count how many eras we span
|
||||
last_era - first_era + 1
|
||||
}
|
||||
}
|
||||
|
||||
impl EraFileId for EraId {
|
||||
fn network_name(&self) -> &str {
|
||||
&self.network_name
|
||||
}
|
||||
|
||||
fn start_number(&self) -> u64 {
|
||||
self.start_slot
|
||||
}
|
||||
|
||||
fn count(&self) -> u32 {
|
||||
self.slot_count
|
||||
}
|
||||
/// Convert to file name following the era file naming:
|
||||
/// `<config-name>-<era-number>-<era-count>-<short-historical-root>.era`
|
||||
/// <https://github.com/eth-clients/e2store-format-specs/blob/main/formats/era.md#file-name>
|
||||
/// See also <https://github.com/eth-clients/e2store-format-specs/blob/main/formats/era.md>
|
||||
fn to_file_name(&self) -> String {
|
||||
let era_number = self.era_number();
|
||||
let era_count = self.calculate_era_count();
|
||||
|
||||
if let Some(hash) = self.hash {
|
||||
format!(
|
||||
"{}-{:05}-{:05}-{:02x}{:02x}{:02x}{:02x}.era",
|
||||
self.network_name, era_number, era_count, hash[0], hash[1], hash[2], hash[3]
|
||||
)
|
||||
} else {
|
||||
// era spec format with placeholder hash when no hash available
|
||||
// Format: `<config-name>-<era-number>-<era-count>-00000000.era`
|
||||
format!("{}-{:05}-{:05}-00000000.era", self.network_name, era_number, era_count)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
@@ -16,7 +16,7 @@
|
||||
//!
|
||||
//! ```rust
|
||||
//! use alloy_consensus::Header;
|
||||
//! use reth_era::{common::decode::DecodeCompressed, era1::types::execution::CompressedHeader};
|
||||
//! use reth_era::{common::decode::DecodeCompressedRlp, era1::types::execution::CompressedHeader};
|
||||
//!
|
||||
//! let header = Header { number: 100, ..Default::default() };
|
||||
//! // Compress the header: rlp encoding and Snappy compression
|
||||
@@ -32,7 +32,7 @@
|
||||
//! ```rust
|
||||
//! use alloy_consensus::{BlockBody, Header};
|
||||
//! use alloy_primitives::Bytes;
|
||||
//! use reth_era::{common::decode::DecodeCompressed, era1::types::execution::CompressedBody};
|
||||
//! use reth_era::{common::decode::DecodeCompressedRlp, era1::types::execution::CompressedBody};
|
||||
//! use reth_ethereum_primitives::TransactionSigned;
|
||||
//!
|
||||
//! let body: BlockBody<Bytes> = BlockBody {
|
||||
@@ -53,7 +53,9 @@
|
||||
//!
|
||||
//! ```rust
|
||||
//! use alloy_consensus::{Eip658Value, Receipt, ReceiptEnvelope, ReceiptWithBloom};
|
||||
//! use reth_era::{common::decode::DecodeCompressed, era1::types::execution::CompressedReceipts};
|
||||
//! use reth_era::{
|
||||
//! common::decode::DecodeCompressedRlp, era1::types::execution::CompressedReceipts,
|
||||
//! };
|
||||
//!
|
||||
//! let receipt =
|
||||
//! Receipt { status: Eip658Value::Eip658(true), cumulative_gas_used: 21000, logs: vec![] };
|
||||
@@ -68,7 +70,7 @@
|
||||
//! ``````
|
||||
|
||||
use crate::{
|
||||
common::decode::DecodeCompressed,
|
||||
common::decode::DecodeCompressedRlp,
|
||||
e2s::{error::E2sError, types::Entry},
|
||||
};
|
||||
use alloy_consensus::{Block, BlockBody, Header};
|
||||
@@ -223,7 +225,7 @@ impl CompressedHeader {
|
||||
}
|
||||
}
|
||||
|
||||
impl DecodeCompressed for CompressedHeader {
|
||||
impl DecodeCompressedRlp for CompressedHeader {
|
||||
fn decode<T: Decodable>(&self) -> Result<T, E2sError> {
|
||||
let decoder = SnappyRlpCodec::<T>::new();
|
||||
decoder.decode(&self.data)
|
||||
@@ -310,7 +312,7 @@ impl CompressedBody {
|
||||
}
|
||||
}
|
||||
|
||||
impl DecodeCompressed for CompressedBody {
|
||||
impl DecodeCompressedRlp for CompressedBody {
|
||||
fn decode<T: Decodable>(&self) -> Result<T, E2sError> {
|
||||
let decoder = SnappyRlpCodec::<T>::new();
|
||||
decoder.decode(&self.data)
|
||||
@@ -401,7 +403,7 @@ impl CompressedReceipts {
|
||||
}
|
||||
}
|
||||
|
||||
impl DecodeCompressed for CompressedReceipts {
|
||||
impl DecodeCompressedRlp for CompressedReceipts {
|
||||
fn decode<T: Decodable>(&self) -> Result<T, E2sError> {
|
||||
let decoder = SnappyRlpCodec::<T>::new();
|
||||
decoder.decode(&self.data)
|
||||
|
||||
@@ -174,7 +174,7 @@ impl EraFileId for Era1Id {
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::{
|
||||
common::decode::DecodeCompressed,
|
||||
common::decode::DecodeCompressedRlp,
|
||||
test_utils::{create_sample_block, create_test_block_with_compressed_data},
|
||||
};
|
||||
use alloy_consensus::ReceiptWithBloom;
|
||||
|
||||
190
crates/era/tests/it/era/dd.rs
Normal file
190
crates/era/tests/it/era/dd.rs
Normal file
@@ -0,0 +1,190 @@
|
||||
//! Simple decoding and decompressing tests
|
||||
//! for mainnet era files
|
||||
|
||||
use reth_era::{
|
||||
common::file_ops::{StreamReader, StreamWriter},
|
||||
era::file::{EraReader, EraWriter},
|
||||
};
|
||||
use std::io::Cursor;
|
||||
|
||||
use crate::{EraTestDownloader, HOODI};
|
||||
|
||||
// Helper function to test decompression and decoding for a specific era file
|
||||
async fn test_era_file_decompression_and_decoding(
|
||||
downloader: &EraTestDownloader,
|
||||
filename: &str,
|
||||
network: &str,
|
||||
) -> eyre::Result<()> {
|
||||
println!("\nTesting file: {filename}");
|
||||
let file = downloader.open_era_file(filename, network).await?;
|
||||
|
||||
// Handle genesis era separately
|
||||
if file.group.is_genesis() {
|
||||
// Genesis has no blocks
|
||||
assert_eq!(file.group.blocks.len(), 0, "Genesis should have no blocks");
|
||||
assert!(file.group.slot_index.is_none(), "Genesis should not have block slot index");
|
||||
|
||||
// Test genesis state decompression
|
||||
let state_data = file.group.era_state.decompress()?;
|
||||
assert!(!state_data.is_empty(), "Genesis state should decompress to non-empty data");
|
||||
|
||||
// Verify state slot index
|
||||
assert_eq!(
|
||||
file.group.state_slot_index.slot_count(),
|
||||
1,
|
||||
"Genesis state index should have count of 1"
|
||||
);
|
||||
|
||||
let mut buffer = Vec::new();
|
||||
{
|
||||
let mut writer = EraWriter::new(&mut buffer);
|
||||
writer.write_file(&file)?;
|
||||
}
|
||||
|
||||
let reader = EraReader::new(Cursor::new(&buffer));
|
||||
let read_back_file = reader.read(file.id.network_name.clone())?;
|
||||
|
||||
assert_eq!(
|
||||
file.group.era_state.decompress()?,
|
||||
read_back_file.group.era_state.decompress()?,
|
||||
"Genesis state data should be identical"
|
||||
);
|
||||
|
||||
println!("Genesis era verified successfully");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Non-genesis era - test beacon blocks
|
||||
println!(
|
||||
" Non-genesis era with {} beacon blocks, starting at slot {}",
|
||||
file.group.blocks.len(),
|
||||
file.group.starting_slot()
|
||||
);
|
||||
|
||||
// Test beacon block decompression across different positions
|
||||
let test_block_indices = [
|
||||
0, // First block
|
||||
file.group.blocks.len() / 2, // Middle block
|
||||
file.group.blocks.len() - 1, // Last block
|
||||
];
|
||||
|
||||
for &block_idx in &test_block_indices {
|
||||
let block = &file.group.blocks[block_idx];
|
||||
let slot = file.group.starting_slot() + block_idx as u64;
|
||||
|
||||
println!(
|
||||
"\n Testing beacon block at slot {}, compressed size: {} bytes",
|
||||
slot,
|
||||
block.data.len()
|
||||
);
|
||||
|
||||
// Test beacon block decompression
|
||||
let block_data = block.decompress()?;
|
||||
assert!(
|
||||
!block_data.is_empty(),
|
||||
"Beacon block at slot {slot} decompression should produce non-empty data"
|
||||
);
|
||||
}
|
||||
|
||||
// Test era state decompression
|
||||
let state_data = file.group.era_state.decompress()?;
|
||||
assert!(!state_data.is_empty(), "Era state decompression should produce non-empty data");
|
||||
println!(" Era state decompressed: {} bytes", state_data.len());
|
||||
|
||||
// Verify slot indices
|
||||
if let Some(ref block_slot_index) = file.group.slot_index {
|
||||
println!(
|
||||
" Block slot index: starting_slot={}, count={}",
|
||||
block_slot_index.starting_slot,
|
||||
block_slot_index.slot_count()
|
||||
);
|
||||
|
||||
// Check for empty slots
|
||||
let empty_slots: Vec<usize> = (0..block_slot_index.slot_count())
|
||||
.filter(|&i| !block_slot_index.has_data_at_slot(i))
|
||||
.collect();
|
||||
|
||||
if !empty_slots.is_empty() {
|
||||
println!(
|
||||
" Found {} empty slots (first few): {:?}",
|
||||
empty_slots.len(),
|
||||
&empty_slots[..empty_slots.len().min(5)]
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Test round-trip serialization
|
||||
let mut buffer = Vec::new();
|
||||
{
|
||||
let mut writer = EraWriter::new(&mut buffer);
|
||||
writer.write_file(&file)?;
|
||||
}
|
||||
|
||||
// Read back from buffer
|
||||
let reader = EraReader::new(Cursor::new(&buffer));
|
||||
let read_back_file = reader.read(file.id.network_name.clone())?;
|
||||
|
||||
// Verify basic properties are preserved
|
||||
assert_eq!(file.id.network_name, read_back_file.id.network_name);
|
||||
assert_eq!(file.id.start_slot, read_back_file.id.start_slot);
|
||||
assert_eq!(file.id.slot_count, read_back_file.id.slot_count);
|
||||
assert_eq!(file.group.blocks.len(), read_back_file.group.blocks.len());
|
||||
|
||||
// Test data preservation for beacon blocks
|
||||
for &idx in &test_block_indices {
|
||||
let original_block = &file.group.blocks[idx];
|
||||
let read_back_block = &read_back_file.group.blocks[idx];
|
||||
let slot = file.group.starting_slot() + idx as u64;
|
||||
|
||||
// Test that decompressed data is identical
|
||||
assert_eq!(
|
||||
original_block.decompress()?,
|
||||
read_back_block.decompress()?,
|
||||
"Beacon block data should be identical for slot {slot}"
|
||||
);
|
||||
}
|
||||
|
||||
// Test state data preservation
|
||||
assert_eq!(
|
||||
file.group.era_state.decompress()?,
|
||||
read_back_file.group.era_state.decompress()?,
|
||||
"Era state data should be identical"
|
||||
);
|
||||
|
||||
// Test slot indices preservation
|
||||
if let (Some(original_index), Some(read_index)) =
|
||||
(&file.group.slot_index, &read_back_file.group.slot_index)
|
||||
{
|
||||
assert_eq!(
|
||||
original_index.starting_slot, read_index.starting_slot,
|
||||
"Block slot index starting slot should match"
|
||||
);
|
||||
assert_eq!(
|
||||
original_index.offsets, read_index.offsets,
|
||||
"Block slot index offsets should match"
|
||||
);
|
||||
}
|
||||
|
||||
assert_eq!(
|
||||
file.group.state_slot_index.starting_slot,
|
||||
read_back_file.group.state_slot_index.starting_slot,
|
||||
"State slot index starting slot should match"
|
||||
);
|
||||
assert_eq!(
|
||||
file.group.state_slot_index.offsets, read_back_file.group.state_slot_index.offsets,
|
||||
"State slot index offsets should match"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test_case::test_case("hoodi-00000-212f13fc.era"; "era_dd_hoodi_0")]
|
||||
#[test_case::test_case("hoodi-00021-857e418b.era"; "era_dd_hoodi_21")]
|
||||
#[test_case::test_case("hoodi-00175-202aaa6d.era"; "era_dd_hoodi_175")]
|
||||
#[test_case::test_case("hoodi-00201-0d521fc8.era"; "era_dd_hoodi_201")]
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
#[ignore = "download intensive"]
|
||||
async fn test_hoodi_era1_file_decompression_and_decoding(filename: &str) -> eyre::Result<()> {
|
||||
let downloader = EraTestDownloader::new().await?;
|
||||
test_era_file_decompression_and_decoding(&downloader, filename, HOODI).await
|
||||
}
|
||||
37
crates/era/tests/it/era/genesis.rs
Normal file
37
crates/era/tests/it/era/genesis.rs
Normal file
@@ -0,0 +1,37 @@
|
||||
//! Genesis block tests for `era1` files.
|
||||
//!
|
||||
//! These tests verify proper decompression and decoding of genesis blocks
|
||||
//! from different networks.
|
||||
|
||||
use crate::{EraTestDownloader, ERA_HOODI_FILES_NAMES, HOODI};
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
#[ignore = "download intensive"]
|
||||
async fn test_hoodi_genesis_era_decompression() -> eyre::Result<()> {
|
||||
let downloader = EraTestDownloader::new().await?;
|
||||
|
||||
let file = downloader.open_era_file(ERA_HOODI_FILES_NAMES[0], HOODI).await?;
|
||||
|
||||
// Verify this is genesis era
|
||||
assert!(file.group.is_genesis(), "First file should be genesis era");
|
||||
assert_eq!(file.group.starting_slot(), 0, "Genesis should start at slot 0");
|
||||
|
||||
// Genesis era has no blocks
|
||||
assert_eq!(file.group.blocks.len(), 0, "Genesis era should have no blocks");
|
||||
|
||||
// Genesis should not have block slot index
|
||||
assert!(file.group.slot_index.is_none(), "Genesis should not have block slot index");
|
||||
|
||||
// Test state decompression
|
||||
let state_data = file.group.era_state.decompress()?;
|
||||
assert!(!state_data.is_empty(), "Decompressed state should not be empty");
|
||||
|
||||
// Verify state slot index
|
||||
assert_eq!(
|
||||
file.group.state_slot_index.slot_count(),
|
||||
1,
|
||||
"Genesis state index should have count of 1"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
2
crates/era/tests/it/era/mod.rs
Normal file
2
crates/era/tests/it/era/mod.rs
Normal file
@@ -0,0 +1,2 @@
|
||||
mod dd;
|
||||
mod genesis;
|
||||
@@ -14,11 +14,11 @@ use reth_era::{
|
||||
use reth_ethereum_primitives::TransactionSigned;
|
||||
use std::io::Cursor;
|
||||
|
||||
use crate::{Era1TestDownloader, MAINNET};
|
||||
use crate::{EraTestDownloader, MAINNET};
|
||||
|
||||
// Helper function to test decompression and decoding for a specific file
|
||||
// Helper function to test decompression and decoding for a specific era1 file
|
||||
async fn test_file_decompression(
|
||||
downloader: &Era1TestDownloader,
|
||||
downloader: &EraTestDownloader,
|
||||
filename: &str,
|
||||
) -> eyre::Result<()> {
|
||||
println!("\nTesting file: {filename}");
|
||||
@@ -154,6 +154,6 @@ async fn test_file_decompression(
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
#[ignore = "download intensive"]
|
||||
async fn test_mainnet_era1_file_decompression_and_decoding(filename: &str) -> eyre::Result<()> {
|
||||
let downloader = Era1TestDownloader::new().await?;
|
||||
let downloader = EraTestDownloader::new().await?;
|
||||
test_file_decompression(&downloader, filename).await
|
||||
}
|
||||
@@ -4,7 +4,7 @@
|
||||
//! from different networks.
|
||||
|
||||
use crate::{
|
||||
Era1TestDownloader, ERA1_MAINNET_FILES_NAMES, ERA1_SEPOLIA_FILES_NAMES, MAINNET, SEPOLIA,
|
||||
EraTestDownloader, ERA1_MAINNET_FILES_NAMES, ERA1_SEPOLIA_FILES_NAMES, MAINNET, SEPOLIA,
|
||||
};
|
||||
use alloy_consensus::{BlockBody, Header};
|
||||
use reth_era::{e2s::types::IndexEntry, era1::types::execution::CompressedBody};
|
||||
@@ -13,7 +13,7 @@ use reth_ethereum_primitives::TransactionSigned;
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
#[ignore = "download intensive"]
|
||||
async fn test_mainnet_genesis_block_decompression() -> eyre::Result<()> {
|
||||
let downloader = Era1TestDownloader::new().await?;
|
||||
let downloader = EraTestDownloader::new().await?;
|
||||
|
||||
let file = downloader.open_era1_file(ERA1_MAINNET_FILES_NAMES[0], MAINNET).await?;
|
||||
|
||||
@@ -65,7 +65,7 @@ async fn test_mainnet_genesis_block_decompression() -> eyre::Result<()> {
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
#[ignore = "download intensive"]
|
||||
async fn test_sepolia_genesis_block_decompression() -> eyre::Result<()> {
|
||||
let downloader = Era1TestDownloader::new().await?;
|
||||
let downloader = EraTestDownloader::new().await?;
|
||||
|
||||
let file = downloader.open_era1_file(ERA1_SEPOLIA_FILES_NAMES[0], SEPOLIA).await?;
|
||||
|
||||
3
crates/era/tests/it/era1/mod.rs
Normal file
3
crates/era/tests/it/era1/mod.rs
Normal file
@@ -0,0 +1,3 @@
|
||||
mod dd;
|
||||
mod genesis;
|
||||
mod roundtrip;
|
||||
@@ -24,11 +24,11 @@ use reth_era::{
|
||||
use reth_ethereum_primitives::TransactionSigned;
|
||||
use std::io::Cursor;
|
||||
|
||||
use crate::{Era1TestDownloader, MAINNET, SEPOLIA};
|
||||
use crate::{EraTestDownloader, MAINNET, SEPOLIA};
|
||||
|
||||
// Helper function to test roundtrip compression/encoding for a specific file
|
||||
async fn test_file_roundtrip(
|
||||
downloader: &Era1TestDownloader,
|
||||
downloader: &EraTestDownloader,
|
||||
filename: &str,
|
||||
network: &str,
|
||||
) -> eyre::Result<()> {
|
||||
@@ -259,7 +259,7 @@ async fn test_file_roundtrip(
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
#[ignore = "download intensive"]
|
||||
async fn test_roundtrip_compression_encoding_mainnet(filename: &str) -> eyre::Result<()> {
|
||||
let downloader = Era1TestDownloader::new().await?;
|
||||
let downloader = EraTestDownloader::new().await?;
|
||||
test_file_roundtrip(&downloader, filename, MAINNET).await
|
||||
}
|
||||
|
||||
@@ -270,7 +270,7 @@ async fn test_roundtrip_compression_encoding_mainnet(filename: &str) -> eyre::Re
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
#[ignore = "download intensive"]
|
||||
async fn test_roundtrip_compression_encoding_sepolia(filename: &str) -> eyre::Result<()> {
|
||||
let downloader = Era1TestDownloader::new().await?;
|
||||
let downloader = EraTestDownloader::new().await?;
|
||||
|
||||
test_file_roundtrip(&downloader, filename, SEPOLIA).await?;
|
||||
|
||||
@@ -8,8 +8,9 @@
|
||||
|
||||
use reqwest::{Client, Url};
|
||||
use reth_era::{
|
||||
common::file_ops::FileReader,
|
||||
common::file_ops::{EraFileType, FileReader},
|
||||
e2s::error::E2sError,
|
||||
era::file::{EraFile, EraReader},
|
||||
era1::file::{Era1File, Era1Reader},
|
||||
};
|
||||
use reth_era_downloader::EraClient;
|
||||
@@ -23,9 +24,8 @@ use std::{
|
||||
use eyre::{eyre, Result};
|
||||
use tempfile::TempDir;
|
||||
|
||||
mod dd;
|
||||
mod genesis;
|
||||
mod roundtrip;
|
||||
mod era;
|
||||
mod era1;
|
||||
|
||||
const fn main() {}
|
||||
|
||||
@@ -33,7 +33,7 @@ const fn main() {}
|
||||
const MAINNET: &str = "mainnet";
|
||||
/// Default mainnet url
|
||||
/// for downloading mainnet `.era1` files
|
||||
const MAINNET_URL: &str = "https://era.ithaca.xyz/era1/";
|
||||
const ERA1_MAINNET_URL: &str = "https://era.ithaca.xyz/era1/";
|
||||
|
||||
/// Succinct list of mainnet files we want to download
|
||||
/// from <https://era.ithaca.xyz/era1/>
|
||||
@@ -54,7 +54,7 @@ const SEPOLIA: &str = "sepolia";
|
||||
|
||||
/// Default sepolia url
|
||||
/// for downloading sepolia `.era1` files
|
||||
const SEPOLIA_URL: &str = "https://era.ithaca.xyz/sepolia-era1/";
|
||||
const ERA1_SEPOLIA_URL: &str = "https://era.ithaca.xyz/sepolia-era1/";
|
||||
|
||||
/// Succinct list of sepolia files we want to download
|
||||
/// from <https://era.ithaca.xyz/sepolia-era1/>
|
||||
@@ -66,18 +66,50 @@ const ERA1_SEPOLIA_FILES_NAMES: [&str; 4] = [
|
||||
"sepolia-00182-a4f0a8a1.era1",
|
||||
];
|
||||
|
||||
const HOODI: &str = "hoodi";
|
||||
|
||||
/// Default hoodi url
|
||||
/// for downloading hoodi `.era` files
|
||||
/// TODO: to replace with internal era files hosting url
|
||||
const ERA_HOODI_URL: &str = "https://hoodi.era.nimbus.team/";
|
||||
|
||||
/// Succinct list of hoodi files we want to download
|
||||
/// from <https://hoodi.era.nimbus.team/> //TODO: to replace with internal era files hosting url
|
||||
/// for testing purposes
|
||||
const ERA_HOODI_FILES_NAMES: [&str; 4] = [
|
||||
"hoodi-00000-212f13fc.era",
|
||||
"hoodi-00021-857e418b.era",
|
||||
"hoodi-00175-202aaa6d.era",
|
||||
"hoodi-00201-0d521fc8.era",
|
||||
];
|
||||
|
||||
/// Default mainnet url
|
||||
/// for downloading mainnet `.era` files
|
||||
//TODO: to replace with internal era files hosting url
|
||||
const ERA_MAINNET_URL: &str = "https://mainnet.era.nimbus.team/";
|
||||
|
||||
/// Succinct list of mainnet files we want to download
|
||||
/// from <https://mainnet.era.nimbus.team/> //TODO: to replace with internal era files hosting url
|
||||
/// for testing purposes
|
||||
const ERA_MAINNET_FILES_NAMES: [&str; 4] = [
|
||||
"mainnet-00000-4b363db9.era",
|
||||
"mainnet-00518-4e267a3a.era",
|
||||
"mainnet-01140-f70d4869.era",
|
||||
"mainnet-01581-82073d28.era",
|
||||
];
|
||||
|
||||
/// Utility for downloading `.era1` files for tests
|
||||
/// in a temporary directory
|
||||
/// and caching them in memory
|
||||
#[derive(Debug)]
|
||||
struct Era1TestDownloader {
|
||||
struct EraTestDownloader {
|
||||
/// Temporary directory for storing downloaded files
|
||||
temp_dir: TempDir,
|
||||
/// Cache mapping file names to their paths
|
||||
file_cache: Arc<Mutex<HashMap<String, PathBuf>>>,
|
||||
}
|
||||
|
||||
impl Era1TestDownloader {
|
||||
impl EraTestDownloader {
|
||||
/// Create a new downloader instance with a temporary directory
|
||||
async fn new() -> Result<Self> {
|
||||
let temp_dir =
|
||||
@@ -97,29 +129,9 @@ impl Era1TestDownloader {
|
||||
}
|
||||
|
||||
// check if the filename is supported
|
||||
if !ERA1_MAINNET_FILES_NAMES.contains(&filename) &&
|
||||
!ERA1_SEPOLIA_FILES_NAMES.contains(&filename)
|
||||
{
|
||||
return Err(eyre!(
|
||||
"Unknown file: {}. Only the following files are supported: {:?} or {:?}",
|
||||
filename,
|
||||
ERA1_MAINNET_FILES_NAMES,
|
||||
ERA1_SEPOLIA_FILES_NAMES
|
||||
));
|
||||
}
|
||||
|
||||
// initialize the client and build url config
|
||||
let url = match network {
|
||||
MAINNET => MAINNET_URL,
|
||||
SEPOLIA => SEPOLIA_URL,
|
||||
_ => {
|
||||
return Err(eyre!(
|
||||
"Unknown network: {}. Only mainnet and sepolia are supported.",
|
||||
network
|
||||
));
|
||||
}
|
||||
};
|
||||
self.validate_filename(filename, network)?;
|
||||
|
||||
let (url, _): (&str, &[&str]) = self.get_network_config(filename, network)?;
|
||||
let final_url = Url::from_str(url).map_err(|e| eyre!("Failed to parse URL: {}", e))?;
|
||||
|
||||
let folder = self.temp_dir.path();
|
||||
@@ -142,6 +154,7 @@ impl Era1TestDownloader {
|
||||
.download_to_file(file_url)
|
||||
.await
|
||||
.map_err(|e| eyre!("Failed to download file: {}", e))?;
|
||||
|
||||
// update the cache
|
||||
{
|
||||
let mut cache = self.file_cache.lock().unwrap();
|
||||
@@ -151,9 +164,54 @@ impl Era1TestDownloader {
|
||||
Ok(downloaded_path.to_path_buf())
|
||||
}
|
||||
|
||||
/// Validate that filename is in the supported list for the network
|
||||
fn validate_filename(&self, filename: &str, network: &str) -> Result<()> {
|
||||
let (_, supported_files) = self.get_network_config(filename, network)?;
|
||||
|
||||
if !supported_files.contains(&filename) {
|
||||
return Err(eyre!(
|
||||
"Unknown file: '{}' for network '{}'. Supported files: {:?}",
|
||||
filename,
|
||||
network,
|
||||
supported_files
|
||||
));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get network configuration, URL and supported files, based on network and file type
|
||||
fn get_network_config(
|
||||
&self,
|
||||
filename: &str,
|
||||
network: &str,
|
||||
) -> Result<(&'static str, &'static [&'static str])> {
|
||||
let file_type = EraFileType::from_filename(filename)
|
||||
.ok_or_else(|| eyre!("Unknown file extension for: {}", filename))?;
|
||||
|
||||
match (network, file_type) {
|
||||
(MAINNET, EraFileType::Era1) => Ok((ERA1_MAINNET_URL, &ERA1_MAINNET_FILES_NAMES[..])),
|
||||
(MAINNET, EraFileType::Era) => Ok((ERA_MAINNET_URL, &ERA_MAINNET_FILES_NAMES[..])),
|
||||
(SEPOLIA, EraFileType::Era1) => Ok((ERA1_SEPOLIA_URL, &ERA1_SEPOLIA_FILES_NAMES[..])),
|
||||
(HOODI, EraFileType::Era) => Ok((ERA_HOODI_URL, &ERA_HOODI_FILES_NAMES[..])),
|
||||
_ => Err(eyre!(
|
||||
"Unsupported combination: network '{}' with file type '{:?}'",
|
||||
network,
|
||||
file_type
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
/// open .era1 file, downloading it if necessary
|
||||
async fn open_era1_file(&self, filename: &str, network: &str) -> Result<Era1File> {
|
||||
let path = self.download_file(filename, network).await?;
|
||||
Era1Reader::open(&path, network).map_err(|e| eyre!("Failed to open Era1 file: {e}"))
|
||||
}
|
||||
|
||||
/// open .era file, downloading it if necessary
|
||||
#[allow(dead_code)]
|
||||
async fn open_era_file(&self, filename: &str, network: &str) -> Result<EraFile> {
|
||||
let path = self.download_file(filename, network).await?;
|
||||
EraReader::open(&path, network).map_err(|e| eyre!("Failed to open Era1 file: {e}"))
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user