feat: node import support importing gzip compressed file (#17877)

Signed-off-by: tmelhao <tmel0103@gmail.com>
Co-authored-by: tmelhao <tmel0103@gmail.com>
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
Haotian
2025-09-25 05:08:54 +08:00
committed by GitHub
parent bdc59799d0
commit 8f804d385d
5 changed files with 237 additions and 71 deletions

View File

@@ -40,6 +40,7 @@ pin-project.workspace = true
tokio = { workspace = true, features = ["sync", "fs", "io-util"] }
tokio-stream.workspace = true
tokio-util = { workspace = true, features = ["codec"] }
async-compression = { workspace = true, features = ["gzip", "tokio"] }
# metrics
reth-metrics.workspace = true

View File

@@ -1,6 +1,7 @@
use alloy_consensus::BlockHeader;
use alloy_eips::BlockHashOrNumber;
use alloy_primitives::{BlockHash, BlockNumber, Sealable, B256};
use async_compression::tokio::bufread::GzipDecoder;
use futures::Future;
use itertools::Either;
use reth_consensus::{Consensus, ConsensusError};
@@ -16,7 +17,10 @@ use reth_network_peers::PeerId;
use reth_primitives_traits::{Block, BlockBody, FullBlock, SealedBlock, SealedHeader};
use std::{collections::HashMap, io, ops::RangeInclusive, path::Path, sync::Arc};
use thiserror::Error;
use tokio::{fs::File, io::AsyncReadExt};
use tokio::{
fs::File,
io::{AsyncReadExt, BufReader},
};
use tokio_stream::StreamExt;
use tokio_util::codec::FramedRead;
use tracing::{debug, trace, warn};
@@ -392,13 +396,115 @@ impl<B: FullBlock> BlockClient for FileClient<B> {
type Block = B;
}
/// File reader type for handling different compression formats.
#[derive(Debug)]
enum FileReader {
/// Regular uncompressed file with remaining byte tracking.
Plain { file: File, remaining_bytes: u64 },
/// Gzip compressed file.
Gzip(GzipDecoder<BufReader<File>>),
}
impl FileReader {
/// Read some data into the provided buffer, returning the number of bytes read.
async fn read(&mut self, buf: &mut [u8]) -> Result<usize, io::Error> {
match self {
Self::Plain { file, .. } => file.read(buf).await,
Self::Gzip(decoder) => decoder.read(buf).await,
}
}
/// Read next chunk from file. Returns the number of bytes read for plain files,
/// or a boolean indicating if data is available for gzip files.
async fn read_next_chunk(
&mut self,
chunk: &mut Vec<u8>,
chunk_byte_len: u64,
) -> Result<Option<u64>, FileClientError> {
match self {
Self::Plain { .. } => self.read_plain_chunk(chunk, chunk_byte_len).await,
Self::Gzip(_) => {
Ok((self.read_gzip_chunk(chunk, chunk_byte_len).await?)
.then_some(chunk.len() as u64))
}
}
}
async fn read_plain_chunk(
&mut self,
chunk: &mut Vec<u8>,
chunk_byte_len: u64,
) -> Result<Option<u64>, FileClientError> {
let Self::Plain { file, remaining_bytes } = self else {
unreachable!("read_plain_chunk should only be called on Plain variant")
};
if *remaining_bytes == 0 && chunk.is_empty() {
// eof
return Ok(None)
}
let chunk_target_len = chunk_byte_len.min(*remaining_bytes + chunk.len() as u64);
let old_bytes_len = chunk.len() as u64;
// calculate reserved space in chunk
let new_read_bytes_target_len = chunk_target_len - old_bytes_len;
// read new bytes from file
let prev_read_bytes_len = chunk.len();
chunk.extend(std::iter::repeat_n(0, new_read_bytes_target_len as usize));
let reader = &mut chunk[prev_read_bytes_len..];
// actual bytes that have been read
let new_read_bytes_len = file.read_exact(reader).await? as u64;
let next_chunk_byte_len = chunk.len();
// update remaining file length
*remaining_bytes -= new_read_bytes_len;
debug!(target: "downloaders::file",
max_chunk_byte_len=chunk_byte_len,
prev_read_bytes_len,
new_read_bytes_target_len,
new_read_bytes_len,
next_chunk_byte_len,
remaining_file_byte_len=*remaining_bytes,
"new bytes were read from file"
);
Ok(Some(next_chunk_byte_len as u64))
}
/// Read next chunk from gzipped file.
async fn read_gzip_chunk(
&mut self,
chunk: &mut Vec<u8>,
chunk_byte_len: u64,
) -> Result<bool, FileClientError> {
loop {
if chunk.len() >= chunk_byte_len as usize {
return Ok(true)
}
let mut buffer = vec![0u8; 64 * 1024];
match self.read(&mut buffer).await {
Ok(0) => return Ok(!chunk.is_empty()),
Ok(n) => {
buffer.truncate(n);
chunk.extend_from_slice(&buffer);
}
Err(e) => return Err(e.into()),
}
}
}
}
/// Chunks file into several [`FileClient`]s.
#[derive(Debug)]
pub struct ChunkedFileReader {
/// File to read from.
file: File,
/// Current file byte length.
file_byte_len: u64,
/// File reader (either plain or gzip).
file: FileReader,
/// Bytes that have been read.
chunk: Vec<u8>,
/// Max bytes per chunk.
@@ -409,97 +515,64 @@ pub struct ChunkedFileReader {
}
impl ChunkedFileReader {
/// Returns the remaining file length.
pub const fn file_len(&self) -> u64 {
self.file_byte_len
}
/// Opens the file to import from given path. Returns a new instance. If no chunk byte length
/// is passed, chunks have [`DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE`] (one static file).
/// Automatically detects gzip files by extension (.gz, .gzip).
pub async fn new<P: AsRef<Path>>(
path: P,
chunk_byte_len: Option<u64>,
) -> Result<Self, FileClientError> {
let path = path.as_ref();
let file = File::open(path).await?;
let chunk_byte_len = chunk_byte_len.unwrap_or(DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE);
Self::from_file(file, chunk_byte_len).await
Self::from_file(
file,
chunk_byte_len,
path.extension()
.and_then(|ext| ext.to_str())
.is_some_and(|ext| ["gz", "gzip"].contains(&ext)),
)
.await
}
/// Opens the file to import from given path. Returns a new instance.
pub async fn from_file(file: File, chunk_byte_len: u64) -> Result<Self, FileClientError> {
// get file len from metadata before reading
let metadata = file.metadata().await?;
let file_byte_len = metadata.len();
Ok(Self { file, file_byte_len, chunk: vec![], chunk_byte_len, highest_block: None })
}
/// Calculates the number of bytes to read from the chain file. Returns a tuple of the chunk
/// length and the remaining file length.
const fn chunk_len(&self) -> u64 {
let Self { chunk_byte_len, file_byte_len, .. } = *self;
let file_byte_len = file_byte_len + self.chunk.len() as u64;
if chunk_byte_len > file_byte_len {
// last chunk
file_byte_len
pub async fn from_file(
file: File,
chunk_byte_len: u64,
is_gzip: bool,
) -> Result<Self, FileClientError> {
let file_reader = if is_gzip {
FileReader::Gzip(GzipDecoder::new(BufReader::new(file)))
} else {
chunk_byte_len
}
let remaining_bytes = file.metadata().await?.len();
FileReader::Plain { file, remaining_bytes }
};
Ok(Self { file: file_reader, chunk: vec![], chunk_byte_len, highest_block: None })
}
/// Reads bytes from file and buffers as next chunk to decode. Returns byte length of next
/// chunk to read.
async fn read_next_chunk(&mut self) -> Result<Option<u64>, io::Error> {
if self.file_byte_len == 0 && self.chunk.is_empty() {
// eof
return Ok(None)
}
let chunk_target_len = self.chunk_len();
let old_bytes_len = self.chunk.len() as u64;
// calculate reserved space in chunk
let new_read_bytes_target_len = chunk_target_len - old_bytes_len;
// read new bytes from file
let prev_read_bytes_len = self.chunk.len();
self.chunk.extend(std::iter::repeat_n(0, new_read_bytes_target_len as usize));
let reader = &mut self.chunk[prev_read_bytes_len..];
// actual bytes that have been read
let new_read_bytes_len = self.file.read_exact(reader).await? as u64;
let next_chunk_byte_len = self.chunk.len();
// update remaining file length
self.file_byte_len -= new_read_bytes_len;
debug!(target: "downloaders::file",
max_chunk_byte_len=self.chunk_byte_len,
prev_read_bytes_len,
new_read_bytes_target_len,
new_read_bytes_len,
next_chunk_byte_len,
remaining_file_byte_len=self.file_byte_len,
"new bytes were read from file"
);
Ok(Some(next_chunk_byte_len as u64))
async fn read_next_chunk(&mut self) -> Result<Option<u64>, FileClientError> {
self.file.read_next_chunk(&mut self.chunk, self.chunk_byte_len).await
}
/// Read next chunk from file. Returns [`FileClient`] containing decoded chunk.
///
/// For gzipped files, this method accumulates data until at least `chunk_byte_len` bytes
/// are available before processing. For plain files, it uses the original chunking logic.
pub async fn next_chunk<B: FullBlock>(
&mut self,
consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
parent_header: Option<SealedHeader<B::Header>>,
) -> Result<Option<FileClient<B>>, FileClientError> {
let Some(next_chunk_byte_len) = self.read_next_chunk().await? else { return Ok(None) };
let Some(chunk_len) = self.read_next_chunk().await? else { return Ok(None) };
// make new file client from chunk
let DecodedFileChunk { file_client, remaining_bytes, .. } =
FileClientBuilder { consensus, parent_header }
.build(&self.chunk[..], next_chunk_byte_len)
.build(&self.chunk[..], chunk_len)
.await?;
// save left over bytes
@@ -513,7 +586,15 @@ impl ChunkedFileReader {
where
T: FromReceiptReader,
{
let Some(next_chunk_byte_len) = self.read_next_chunk().await? else { return Ok(None) };
let Some(next_chunk_byte_len) = self.read_next_chunk().await.map_err(|e| {
T::Error::from(match e {
FileClientError::Io(io_err) => io_err,
_ => io::Error::other(e.to_string()),
})
})?
else {
return Ok(None)
};
// make new file client from chunk
let DecodedFileChunk { file_client, remaining_bytes, highest_block } =
@@ -572,6 +653,7 @@ mod tests {
test_utils::{generate_bodies, generate_bodies_file},
};
use assert_matches::assert_matches;
use async_compression::tokio::write::GzipEncoder;
use futures_util::stream::StreamExt;
use rand::Rng;
use reth_consensus::{noop::NoopConsensus, test_utils::TestConsensus};
@@ -582,6 +664,10 @@ mod tests {
};
use reth_provider::test_utils::create_test_provider_factory;
use std::sync::Arc;
use tokio::{
fs::File,
io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom},
};
#[tokio::test]
async fn streams_bodies_from_buffer() {
@@ -712,7 +798,8 @@ mod tests {
trace!(target: "downloaders::file::test", chunk_byte_len);
// init reader
let mut reader = ChunkedFileReader::from_file(file, chunk_byte_len as u64).await.unwrap();
let mut reader =
ChunkedFileReader::from_file(file, chunk_byte_len as u64, false).await.unwrap();
let mut downloaded_headers: Vec<SealedHeader> = vec![];
@@ -746,4 +833,79 @@ mod tests {
// the first header is not included in the response
assert_eq!(headers[1..], downloaded_headers);
}
#[tokio::test]
async fn test_chunk_download_headers_from_gzip_file() {
reth_tracing::init_test_tracing();
// Generate some random blocks
let (file, headers, _) = generate_bodies_file(0..=14).await;
// Create a gzipped version of the file
let gzip_temp_file = tempfile::NamedTempFile::new().unwrap();
let gzip_path = gzip_temp_file.path().to_owned();
drop(gzip_temp_file); // Close the file so we can write to it
// Read original file content first
let mut original_file = file;
original_file.seek(SeekFrom::Start(0)).await.unwrap();
let mut original_content = Vec::new();
original_file.read_to_end(&mut original_content).await.unwrap();
let mut gzip_file = File::create(&gzip_path).await.unwrap();
let mut encoder = GzipEncoder::new(&mut gzip_file);
// Write the original content through the gzip encoder
encoder.write_all(&original_content).await.unwrap();
encoder.shutdown().await.unwrap();
drop(gzip_file);
// Reopen the gzipped file for reading
let gzip_file = File::open(&gzip_path).await.unwrap();
// calculate min for chunk byte length range, pick a lower bound that guarantees at least
// one block will be read
let chunk_byte_len = rand::rng().random_range(2000..=10_000);
trace!(target: "downloaders::file::test", chunk_byte_len);
// init reader with gzip=true
let mut reader =
ChunkedFileReader::from_file(gzip_file, chunk_byte_len as u64, true).await.unwrap();
let mut downloaded_headers: Vec<SealedHeader> = vec![];
let mut local_header = headers.first().unwrap().clone();
// test
while let Some(client) =
reader.next_chunk::<Block>(NoopConsensus::arc(), None).await.unwrap()
{
if client.headers_len() == 0 {
continue;
}
let sync_target = client.tip_header().expect("tip_header should not be None");
let sync_target_hash = sync_target.hash();
// construct headers downloader and use first header
let mut header_downloader = ReverseHeadersDownloaderBuilder::default()
.build(Arc::clone(&Arc::new(client)), Arc::new(TestConsensus::default()));
header_downloader.update_local_head(local_header.clone());
header_downloader.update_sync_target(SyncTarget::Tip(sync_target_hash));
// get headers first
let mut downloaded_headers_chunk = header_downloader.next().await.unwrap().unwrap();
// export new local header to outer scope
local_header = sync_target;
// reverse to make sure it's in the right order before comparing
downloaded_headers_chunk.reverse();
downloaded_headers.extend_from_slice(&downloaded_headers_chunk);
}
// the first header is not included in the response
assert_eq!(headers[1..], downloaded_headers);
}
}