refactor(era-downloader): support both era and era1 file types in downloader (#19617)

This commit is contained in:
Léa Narzis
2025-11-13 20:26:21 +01:00
committed by GitHub
parent c5b7d4a58a
commit 96993dd073
16 changed files with 1796 additions and 71 deletions

1
Cargo.lock generated
View File

@@ -8287,6 +8287,7 @@ dependencies = [
"futures",
"futures-util",
"reqwest",
"reth-era",
"reth-fs-util",
"sha2",
"tempfile",

View File

@@ -15,6 +15,7 @@ alloy-primitives.workspace = true
# reth
reth-fs-util.workspace = true
reth-era.workspace = true
# http
bytes.workspace = true

View File

@@ -3,14 +3,18 @@ use bytes::Bytes;
use eyre::{eyre, OptionExt};
use futures_util::{stream::StreamExt, Stream, TryStreamExt};
use reqwest::{Client, IntoUrl, Url};
use reth_era::common::file_ops::EraFileType;
use sha2::{Digest, Sha256};
use std::{future::Future, path::Path, str::FromStr};
use tokio::{
fs::{self, File},
io::{self, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWriteExt},
join, try_join,
try_join,
};
/// Downloaded index page filename
const INDEX_HTML_FILE: &str = "index.html";
/// Accesses the network over HTTP.
pub trait HttpClient {
/// Makes an HTTP GET request to `url`. Returns a stream of response body bytes.
@@ -41,6 +45,7 @@ pub struct EraClient<Http> {
client: Http,
url: Url,
folder: Box<Path>,
era_type: EraFileType,
}
impl<Http: HttpClient + Clone> EraClient<Http> {
@@ -48,7 +53,8 @@ impl<Http: HttpClient + Clone> EraClient<Http> {
/// Constructs [`EraClient`] using `client` to download from `url` into `folder`.
pub fn new(client: Http, url: Url, folder: impl Into<Box<Path>>) -> Self {
Self { client, url, folder: folder.into() }
let era_type = EraFileType::from_url(url.as_str());
Self { client, url, folder: folder.into(), era_type }
}
/// Performs a GET request on `url` and stores the response body into a file located within
@@ -92,9 +98,11 @@ impl<Http: HttpClient + Clone> EraClient<Http> {
}
}
self.assert_checksum(number, actual_checksum?)
.await
.map_err(|e| eyre!("{e} for {file_name} at {}", path.display()))?;
if self.era_type == EraFileType::Era1 {
self.assert_checksum(number, actual_checksum?)
.await
.map_err(|e| eyre!("{e} for {file_name} at {}", path.display()))?;
}
}
Ok(path.into_boxed_path())
@@ -145,9 +153,11 @@ impl<Http: HttpClient + Clone> EraClient<Http> {
pub async fn files_count(&self) -> usize {
let mut count = 0usize;
let file_extension = self.era_type.extension().trim_start_matches('.');
if let Ok(mut dir) = fs::read_dir(&self.folder).await {
while let Ok(Some(entry)) = dir.next_entry().await {
if entry.path().extension() == Some("era1".as_ref()) {
if entry.path().extension() == Some(file_extension.as_ref()) {
count += 1;
}
}
@@ -156,46 +166,35 @@ impl<Http: HttpClient + Clone> EraClient<Http> {
count
}
/// Fetches the list of ERA1 files from `url` and stores it in a file located within `folder`.
/// Fetches the list of ERA1/ERA files from `url` and stores it in a file located within
/// `folder`.
/// For era files, checksum.txt file does not exist, so the checksum verification is
/// skipped.
pub async fn fetch_file_list(&self) -> eyre::Result<()> {
let (mut index, mut checksums) = try_join!(
self.client.get(self.url.clone()),
self.client.get(self.url.clone().join(Self::CHECKSUMS)?),
)?;
let index_path = self.folder.to_path_buf().join("index.html");
let index_path = self.folder.to_path_buf().join(INDEX_HTML_FILE);
let checksums_path = self.folder.to_path_buf().join(Self::CHECKSUMS);
let (mut index_file, mut checksums_file) =
try_join!(File::create(&index_path), File::create(&checksums_path))?;
loop {
let (index, checksums) = join!(index.next(), checksums.next());
let (index, checksums) = (index.transpose()?, checksums.transpose()?);
if index.is_none() && checksums.is_none() {
break;
}
let index_file = &mut index_file;
let checksums_file = &mut checksums_file;
// Only for era1, we download also checksums file
if self.era_type == EraFileType::Era1 {
let checksums_url = self.url.join(Self::CHECKSUMS)?;
try_join!(
async move {
if let Some(index) = index {
io::copy(&mut index.as_ref(), index_file).await?;
}
Ok::<(), eyre::Error>(())
},
async move {
if let Some(checksums) = checksums {
io::copy(&mut checksums.as_ref(), checksums_file).await?;
}
Ok::<(), eyre::Error>(())
},
self.download_file_to_path(self.url.clone(), &index_path),
self.download_file_to_path(checksums_url, &checksums_path)
)?;
} else {
// Download only index file
self.download_file_to_path(self.url.clone(), &index_path).await?;
}
let file = File::open(&index_path).await?;
// Parse and extract era filenames from index.html
self.extract_era_filenames(&index_path).await?;
Ok(())
}
/// Extracts ERA filenames from `index.html` and writes them to the index file
async fn extract_era_filenames(&self, index_path: &Path) -> eyre::Result<()> {
let file = File::open(index_path).await?;
let reader = io::BufReader::new(file);
let mut lines = reader.lines();
@@ -203,21 +202,36 @@ impl<Http: HttpClient + Clone> EraClient<Http> {
let file = File::create(&path).await?;
let mut writer = io::BufWriter::new(file);
let ext = self.era_type.extension();
let ext_len = ext.len();
while let Some(line) = lines.next_line().await? {
if let Some(j) = line.find(".era1") &&
if let Some(j) = line.find(ext) &&
let Some(i) = line[..j].rfind(|c: char| !c.is_alphanumeric() && c != '-')
{
let era = &line[i + 1..j + 5];
let era = &line[i + 1..j + ext_len];
writer.write_all(era.as_bytes()).await?;
writer.write_all(b"\n").await?;
}
}
writer.flush().await?;
Ok(())
}
// Helper to download a file to a specified path
async fn download_file_to_path(&self, url: Url, path: &Path) -> eyre::Result<()> {
let mut stream = self.client.get(url).await?;
let mut file = File::create(path).await?;
while let Some(item) = stream.next().await.transpose()? {
io::copy(&mut item.as_ref(), &mut file).await?;
}
Ok(())
}
/// Returns ERA1 file name that is ordered at `number`.
/// Returns ERA1/ERA file name that is ordered at `number`.
pub async fn number_to_file_name(&self, number: usize) -> eyre::Result<Option<String>> {
let path = self.folder.to_path_buf().join("index");
let file = File::open(&path).await?;
@@ -235,18 +249,23 @@ impl<Http: HttpClient + Clone> EraClient<Http> {
match File::open(path).await {
Ok(file) => {
let number = self
.file_name_to_number(name)
.ok_or_else(|| eyre!("Cannot parse ERA number from {name}"))?;
if self.era_type == EraFileType::Era1 {
let number = self
.file_name_to_number(name)
.ok_or_else(|| eyre!("Cannot parse ERA number from {name}"))?;
let actual_checksum = checksum(file).await?;
let is_verified = self.verify_checksum(number, actual_checksum).await?;
let actual_checksum = checksum(file).await?;
let is_verified = self.verify_checksum(number, actual_checksum).await?;
if !is_verified {
fs::remove_file(path).await?;
if !is_verified {
fs::remove_file(path).await?;
}
Ok(is_verified)
} else {
// For era files, we skip checksum verification, as checksum.txt does not exist
Ok(true)
}
Ok(is_verified)
}
Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(false),
Err(e) => Err(e)?,

View File

@@ -61,21 +61,21 @@ impl HttpClient for FailingClient {
let url = url.into_url().unwrap();
Ok(futures::stream::iter(vec![Ok(match url.as_str() {
"https://mainnet.era1.nimbus.team/" => Bytes::from_static(crate::NIMBUS),
"https://era1.ethportal.net/" => Bytes::from_static(crate::ETH_PORTAL),
"https://era.ithaca.xyz/era1/index.html" => Bytes::from_static(crate::ITHACA),
"https://mainnet.era1.nimbus.team/" => Bytes::from_static(crate::ERA1_NIMBUS),
"https://era1.ethportal.net/" => Bytes::from_static(crate::ERA1_ETH_PORTAL),
"https://era.ithaca.xyz/era1/index.html" => Bytes::from_static(crate::ERA1_ITHACA),
"https://mainnet.era1.nimbus.team/checksums.txt" |
"https://era1.ethportal.net/checksums.txt" |
"https://era.ithaca.xyz/era1/checksums.txt" => Bytes::from_static(CHECKSUMS),
"https://era1.ethportal.net/mainnet-00000-5ec1ffb8.era1" |
"https://mainnet.era1.nimbus.team/mainnet-00000-5ec1ffb8.era1" |
"https://era.ithaca.xyz/era1/mainnet-00000-5ec1ffb8.era1" => {
Bytes::from_static(crate::MAINNET_0)
Bytes::from_static(crate::ERA1_MAINNET_0)
}
"https://era1.ethportal.net/mainnet-00001-a5364e9a.era1" |
"https://mainnet.era1.nimbus.team/mainnet-00001-a5364e9a.era1" |
"https://era.ithaca.xyz/era1/mainnet-00001-a5364e9a.era1" => {
Bytes::from_static(crate::MAINNET_1)
Bytes::from_static(crate::ERA1_MAINNET_1)
}
v => unimplemented!("Unexpected URL \"{v}\""),
})]))

View File

@@ -10,7 +10,7 @@ use test_case::test_case;
#[test_case("https://era1.ethportal.net/"; "ethportal")]
#[test_case("https://era.ithaca.xyz/era1/index.html"; "ithaca")]
#[tokio::test]
async fn test_getting_file_url_after_fetching_file_list(url: &str) {
async fn test_getting_era1_file_url_after_fetching_file_list(url: &str) {
let base_url = Url::from_str(url).unwrap();
let folder = tempdir().unwrap();
let folder = folder.path();
@@ -48,3 +48,19 @@ async fn test_getting_file_after_fetching_file_list(url: &str) {
let actual_count = client.files_count().await;
assert_eq!(actual_count, expected_count);
}
#[test_case("https://mainnet.era.nimbus.team/"; "nimbus")]
#[tokio::test]
async fn test_getting_era_file_url_after_fetching_file_list(url: &str) {
let base_url = Url::from_str(url).unwrap();
let folder = tempdir().unwrap();
let folder = folder.path();
let client = EraClient::new(StubClient, base_url.clone(), folder);
client.fetch_file_list().await.unwrap();
let expected_url = Some(base_url.join("mainnet-00000-4b363db9.era").unwrap());
let actual_url = client.url(0).await.unwrap();
assert_eq!(actual_url, expected_url);
}

View File

@@ -10,7 +10,7 @@ use test_case::test_case;
#[test_case("https://era1.ethportal.net/"; "ethportal")]
#[test_case("https://era.ithaca.xyz/era1/index.html"; "ithaca")]
#[tokio::test]
async fn test_getting_file_name_after_fetching_file_list(url: &str) {
async fn test_getting_era1_file_name_after_fetching_file_list(url: &str) {
let url = Url::from_str(url).unwrap();
let folder = tempdir().unwrap();
let folder = folder.path();
@@ -23,3 +23,19 @@ async fn test_getting_file_name_after_fetching_file_list(url: &str) {
assert_eq!(actual, expected);
}
#[test_case("https://mainnet.era.nimbus.team/"; "nimbus")]
#[tokio::test]
async fn test_getting_era_file_name_after_fetching_file_list(url: &str) {
let url = Url::from_str(url).unwrap();
let folder = tempdir().unwrap();
let folder = folder.path();
let client = EraClient::new(StubClient, url, folder);
client.fetch_file_list().await.unwrap();
let actual = client.number_to_file_name(500).await.unwrap();
let expected = Some("mainnet-00500-87109713.era".to_owned());
assert_eq!(actual, expected);
}

View File

@@ -13,12 +13,20 @@ use futures::Stream;
use reqwest::IntoUrl;
use reth_era_downloader::HttpClient;
pub(crate) const NIMBUS: &[u8] = include_bytes!("../res/nimbus.html");
pub(crate) const ETH_PORTAL: &[u8] = include_bytes!("../res/ethportal.html");
pub(crate) const ITHACA: &[u8] = include_bytes!("../res/ithaca.html");
pub(crate) const CHECKSUMS: &[u8] = include_bytes!("../res/checksums.txt");
pub(crate) const MAINNET_0: &[u8] = include_bytes!("../res/mainnet-00000-5ec1ffb8.era1");
pub(crate) const MAINNET_1: &[u8] = include_bytes!("../res/mainnet-00001-a5364e9a.era1");
pub(crate) const ERA1_NIMBUS: &[u8] = include_bytes!("../res/era1-nimbus.html");
pub(crate) const ERA1_ETH_PORTAL: &[u8] = include_bytes!("../res/ethportal.html");
pub(crate) const ERA1_ITHACA: &[u8] = include_bytes!("../res/era1-ithaca.html");
pub(crate) const ERA1_CHECKSUMS: &[u8] = include_bytes!("../res/checksums.txt");
pub(crate) const ERA1_MAINNET_0: &[u8] =
include_bytes!("../res/era1-files/mainnet-00000-5ec1ffb8.era1");
pub(crate) const ERA1_MAINNET_1: &[u8] =
include_bytes!("../res/era1-files/mainnet-00001-a5364e9a.era1");
pub(crate) const ERA_NIMBUS: &[u8] = include_bytes!("../res/era-nimbus.html");
pub(crate) const ERA_MAINNET_0: &[u8] =
include_bytes!("../res/era-files/mainnet-00000-4b363db9.era");
pub(crate) const ERA_MAINNET_1: &[u8] =
include_bytes!("../res/era-files/mainnet-00001-40cf2f3c.era");
/// An HTTP client pre-programmed with canned answers to received calls.
/// Panics if it receives an unknown call.
@@ -33,22 +41,32 @@ impl HttpClient for StubClient {
let url = url.into_url().unwrap();
Ok(futures::stream::iter(vec![Ok(match url.as_str() {
"https://mainnet.era1.nimbus.team/" => Bytes::from_static(NIMBUS),
"https://era1.ethportal.net/" => Bytes::from_static(ETH_PORTAL),
"https://era.ithaca.xyz/era1/index.html" => Bytes::from_static(ITHACA),
// Era1 urls
"https://mainnet.era1.nimbus.team/" => Bytes::from_static(ERA1_NIMBUS),
"https://era1.ethportal.net/" => Bytes::from_static(ERA1_ETH_PORTAL),
"https://era.ithaca.xyz/era1/index.html" => Bytes::from_static(ERA1_ITHACA),
"https://mainnet.era1.nimbus.team/checksums.txt" |
"https://era1.ethportal.net/checksums.txt" |
"https://era.ithaca.xyz/era1/checksums.txt" => Bytes::from_static(CHECKSUMS),
"https://era.ithaca.xyz/era1/checksums.txt" => Bytes::from_static(ERA1_CHECKSUMS),
"https://era1.ethportal.net/mainnet-00000-5ec1ffb8.era1" |
"https://mainnet.era1.nimbus.team/mainnet-00000-5ec1ffb8.era1" |
"https://era.ithaca.xyz/era1/mainnet-00000-5ec1ffb8.era1" => {
Bytes::from_static(MAINNET_0)
Bytes::from_static(ERA1_MAINNET_0)
}
"https://era1.ethportal.net/mainnet-00001-a5364e9a.era1" |
"https://mainnet.era1.nimbus.team/mainnet-00001-a5364e9a.era1" |
"https://era.ithaca.xyz/era1/mainnet-00001-a5364e9a.era1" => {
Bytes::from_static(MAINNET_1)
Bytes::from_static(ERA1_MAINNET_1)
}
// Era urls
"https://mainnet.era.nimbus.team/" => Bytes::from_static(ERA_NIMBUS),
"https://mainnet.era.nimbus.team/mainnet-00000-4b363db9.era" => {
Bytes::from_static(ERA_MAINNET_0)
}
"https://mainnet.era.nimbus.team/mainnet-00001-40cf2f3c.era" => {
Bytes::from_static(ERA_MAINNET_1)
}
v => unimplemented!("Unexpected URL \"{v}\""),
})]))
}

View File

@@ -34,7 +34,7 @@ async fn test_streaming_files_after_fetching_file_list(url: &str) {
}
#[tokio::test]
async fn test_streaming_files_after_fetching_file_list_into_missing_folder_fails() {
async fn test_streaming_era1_files_after_fetching_file_list_into_missing_folder_fails() {
let base_url = Url::from_str("https://era.ithaca.xyz/era1/index.html").unwrap();
let folder = tempdir().unwrap().path().to_owned();
let client = EraClient::new(StubClient, base_url, folder);
@@ -49,3 +49,20 @@ async fn test_streaming_files_after_fetching_file_list_into_missing_folder_fails
assert_eq!(actual_error, expected_error);
}
#[tokio::test]
async fn test_streaming_era_files_after_fetching_file_list_into_missing_folder_fails() {
let base_url = Url::from_str("https://mainnet.era.nimbus.team").unwrap(); //TODO: change once ithaca host era files
let folder = tempdir().unwrap().path().to_owned();
let client = EraClient::new(StubClient, base_url, folder);
let mut stream = EraStream::new(
client,
EraStreamConfig::default().with_max_files(2).with_max_concurrent_downloads(1),
);
let actual_error = stream.next().await.unwrap().unwrap_err().to_string();
let expected_error = "No such file or directory (os error 2)".to_owned();
assert_eq!(actual_error, expected_error);
}

File diff suppressed because it is too large Load Diff

View File

@@ -122,3 +122,45 @@ impl<T: StreamWriter<File>> FileWriter for T {
Self::create(path, file)
}
}
/// Era file type identifier
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum EraFileType {
/// Consensus layer ERA file, `.era`
/// Contains beacon blocks and states
Era,
/// Execution layer ERA1 file, `.era1`
/// Contains execution blocks pre-merge
Era1,
}
impl EraFileType {
/// Get the file extension for this type, dot included
pub const fn extension(&self) -> &'static str {
match self {
Self::Era => ".era",
Self::Era1 => ".era1",
}
}
/// Detect file type from a filename
pub fn from_filename(filename: &str) -> Option<Self> {
if filename.ends_with(".era") {
Some(Self::Era)
} else if filename.ends_with(".era1") {
Some(Self::Era1)
} else {
None
}
}
/// Detect file type from URL
/// By default, it assumes `Era` type
pub fn from_url(url: &str) -> Self {
if url.contains("era1") {
Self::Era1
} else {
Self::Era
}
}
}