diff --git a/Cargo.lock b/Cargo.lock index 448d714ffc..acfb8041c8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7797,6 +7797,7 @@ dependencies = [ "alloy-primitives", "alloy-rlp", "assert_matches", + "async-compression", "futures", "futures-util", "itertools 0.14.0", diff --git a/Cargo.toml b/Cargo.toml index 0ce9b94a21..b6518707a9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -597,6 +597,7 @@ tokio-tungstenite = "0.26.2" tokio-util = { version = "0.7.4", features = ["codec"] } # async +async-compression = { version = "0.4", default-features = false } async-stream = "0.3" async-trait = "0.1.68" futures = "0.3" diff --git a/crates/net/downloaders/Cargo.toml b/crates/net/downloaders/Cargo.toml index 128da4ff08..d1e31119ee 100644 --- a/crates/net/downloaders/Cargo.toml +++ b/crates/net/downloaders/Cargo.toml @@ -40,6 +40,7 @@ pin-project.workspace = true tokio = { workspace = true, features = ["sync", "fs", "io-util"] } tokio-stream.workspace = true tokio-util = { workspace = true, features = ["codec"] } +async-compression = { workspace = true, features = ["gzip", "tokio"] } # metrics reth-metrics.workspace = true diff --git a/crates/net/downloaders/src/file_client.rs b/crates/net/downloaders/src/file_client.rs index 53d8c7faa1..3f6233615c 100644 --- a/crates/net/downloaders/src/file_client.rs +++ b/crates/net/downloaders/src/file_client.rs @@ -1,6 +1,7 @@ use alloy_consensus::BlockHeader; use alloy_eips::BlockHashOrNumber; use alloy_primitives::{BlockHash, BlockNumber, Sealable, B256}; +use async_compression::tokio::bufread::GzipDecoder; use futures::Future; use itertools::Either; use reth_consensus::{Consensus, ConsensusError}; @@ -16,7 +17,10 @@ use reth_network_peers::PeerId; use reth_primitives_traits::{Block, BlockBody, FullBlock, SealedBlock, SealedHeader}; use std::{collections::HashMap, io, ops::RangeInclusive, path::Path, sync::Arc}; use thiserror::Error; -use tokio::{fs::File, io::AsyncReadExt}; +use tokio::{ + fs::File, + io::{AsyncReadExt, BufReader}, +}; use tokio_stream::StreamExt; use tokio_util::codec::FramedRead; use tracing::{debug, trace, warn}; @@ -392,13 +396,115 @@ impl BlockClient for FileClient { type Block = B; } +/// File reader type for handling different compression formats. +#[derive(Debug)] +enum FileReader { + /// Regular uncompressed file with remaining byte tracking. + Plain { file: File, remaining_bytes: u64 }, + /// Gzip compressed file. + Gzip(GzipDecoder>), +} + +impl FileReader { + /// Read some data into the provided buffer, returning the number of bytes read. + async fn read(&mut self, buf: &mut [u8]) -> Result { + match self { + Self::Plain { file, .. } => file.read(buf).await, + Self::Gzip(decoder) => decoder.read(buf).await, + } + } + + /// Read next chunk from file. Returns the number of bytes read for plain files, + /// or a boolean indicating if data is available for gzip files. + async fn read_next_chunk( + &mut self, + chunk: &mut Vec, + chunk_byte_len: u64, + ) -> Result, FileClientError> { + match self { + Self::Plain { .. } => self.read_plain_chunk(chunk, chunk_byte_len).await, + Self::Gzip(_) => { + Ok((self.read_gzip_chunk(chunk, chunk_byte_len).await?) + .then_some(chunk.len() as u64)) + } + } + } + + async fn read_plain_chunk( + &mut self, + chunk: &mut Vec, + chunk_byte_len: u64, + ) -> Result, FileClientError> { + let Self::Plain { file, remaining_bytes } = self else { + unreachable!("read_plain_chunk should only be called on Plain variant") + }; + + if *remaining_bytes == 0 && chunk.is_empty() { + // eof + return Ok(None) + } + + let chunk_target_len = chunk_byte_len.min(*remaining_bytes + chunk.len() as u64); + let old_bytes_len = chunk.len() as u64; + + // calculate reserved space in chunk + let new_read_bytes_target_len = chunk_target_len - old_bytes_len; + + // read new bytes from file + let prev_read_bytes_len = chunk.len(); + chunk.extend(std::iter::repeat_n(0, new_read_bytes_target_len as usize)); + let reader = &mut chunk[prev_read_bytes_len..]; + + // actual bytes that have been read + let new_read_bytes_len = file.read_exact(reader).await? as u64; + let next_chunk_byte_len = chunk.len(); + + // update remaining file length + *remaining_bytes -= new_read_bytes_len; + + debug!(target: "downloaders::file", + max_chunk_byte_len=chunk_byte_len, + prev_read_bytes_len, + new_read_bytes_target_len, + new_read_bytes_len, + next_chunk_byte_len, + remaining_file_byte_len=*remaining_bytes, + "new bytes were read from file" + ); + + Ok(Some(next_chunk_byte_len as u64)) + } + + /// Read next chunk from gzipped file. + async fn read_gzip_chunk( + &mut self, + chunk: &mut Vec, + chunk_byte_len: u64, + ) -> Result { + loop { + if chunk.len() >= chunk_byte_len as usize { + return Ok(true) + } + + let mut buffer = vec![0u8; 64 * 1024]; + + match self.read(&mut buffer).await { + Ok(0) => return Ok(!chunk.is_empty()), + Ok(n) => { + buffer.truncate(n); + chunk.extend_from_slice(&buffer); + } + Err(e) => return Err(e.into()), + } + } + } +} + /// Chunks file into several [`FileClient`]s. #[derive(Debug)] pub struct ChunkedFileReader { - /// File to read from. - file: File, - /// Current file byte length. - file_byte_len: u64, + /// File reader (either plain or gzip). + file: FileReader, /// Bytes that have been read. chunk: Vec, /// Max bytes per chunk. @@ -409,97 +515,64 @@ pub struct ChunkedFileReader { } impl ChunkedFileReader { - /// Returns the remaining file length. - pub const fn file_len(&self) -> u64 { - self.file_byte_len - } - /// Opens the file to import from given path. Returns a new instance. If no chunk byte length /// is passed, chunks have [`DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE`] (one static file). + /// Automatically detects gzip files by extension (.gz, .gzip). pub async fn new>( path: P, chunk_byte_len: Option, ) -> Result { + let path = path.as_ref(); let file = File::open(path).await?; let chunk_byte_len = chunk_byte_len.unwrap_or(DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE); - Self::from_file(file, chunk_byte_len).await + Self::from_file( + file, + chunk_byte_len, + path.extension() + .and_then(|ext| ext.to_str()) + .is_some_and(|ext| ["gz", "gzip"].contains(&ext)), + ) + .await } /// Opens the file to import from given path. Returns a new instance. - pub async fn from_file(file: File, chunk_byte_len: u64) -> Result { - // get file len from metadata before reading - let metadata = file.metadata().await?; - let file_byte_len = metadata.len(); - - Ok(Self { file, file_byte_len, chunk: vec![], chunk_byte_len, highest_block: None }) - } - - /// Calculates the number of bytes to read from the chain file. Returns a tuple of the chunk - /// length and the remaining file length. - const fn chunk_len(&self) -> u64 { - let Self { chunk_byte_len, file_byte_len, .. } = *self; - let file_byte_len = file_byte_len + self.chunk.len() as u64; - - if chunk_byte_len > file_byte_len { - // last chunk - file_byte_len + pub async fn from_file( + file: File, + chunk_byte_len: u64, + is_gzip: bool, + ) -> Result { + let file_reader = if is_gzip { + FileReader::Gzip(GzipDecoder::new(BufReader::new(file))) } else { - chunk_byte_len - } + let remaining_bytes = file.metadata().await?.len(); + FileReader::Plain { file, remaining_bytes } + }; + + Ok(Self { file: file_reader, chunk: vec![], chunk_byte_len, highest_block: None }) } /// Reads bytes from file and buffers as next chunk to decode. Returns byte length of next /// chunk to read. - async fn read_next_chunk(&mut self) -> Result, io::Error> { - if self.file_byte_len == 0 && self.chunk.is_empty() { - // eof - return Ok(None) - } - - let chunk_target_len = self.chunk_len(); - let old_bytes_len = self.chunk.len() as u64; - - // calculate reserved space in chunk - let new_read_bytes_target_len = chunk_target_len - old_bytes_len; - - // read new bytes from file - let prev_read_bytes_len = self.chunk.len(); - self.chunk.extend(std::iter::repeat_n(0, new_read_bytes_target_len as usize)); - let reader = &mut self.chunk[prev_read_bytes_len..]; - - // actual bytes that have been read - let new_read_bytes_len = self.file.read_exact(reader).await? as u64; - let next_chunk_byte_len = self.chunk.len(); - - // update remaining file length - self.file_byte_len -= new_read_bytes_len; - - debug!(target: "downloaders::file", - max_chunk_byte_len=self.chunk_byte_len, - prev_read_bytes_len, - new_read_bytes_target_len, - new_read_bytes_len, - next_chunk_byte_len, - remaining_file_byte_len=self.file_byte_len, - "new bytes were read from file" - ); - - Ok(Some(next_chunk_byte_len as u64)) + async fn read_next_chunk(&mut self) -> Result, FileClientError> { + self.file.read_next_chunk(&mut self.chunk, self.chunk_byte_len).await } /// Read next chunk from file. Returns [`FileClient`] containing decoded chunk. + /// + /// For gzipped files, this method accumulates data until at least `chunk_byte_len` bytes + /// are available before processing. For plain files, it uses the original chunking logic. pub async fn next_chunk( &mut self, consensus: Arc>, parent_header: Option>, ) -> Result>, FileClientError> { - let Some(next_chunk_byte_len) = self.read_next_chunk().await? else { return Ok(None) }; + let Some(chunk_len) = self.read_next_chunk().await? else { return Ok(None) }; // make new file client from chunk let DecodedFileChunk { file_client, remaining_bytes, .. } = FileClientBuilder { consensus, parent_header } - .build(&self.chunk[..], next_chunk_byte_len) + .build(&self.chunk[..], chunk_len) .await?; // save left over bytes @@ -513,7 +586,15 @@ impl ChunkedFileReader { where T: FromReceiptReader, { - let Some(next_chunk_byte_len) = self.read_next_chunk().await? else { return Ok(None) }; + let Some(next_chunk_byte_len) = self.read_next_chunk().await.map_err(|e| { + T::Error::from(match e { + FileClientError::Io(io_err) => io_err, + _ => io::Error::other(e.to_string()), + }) + })? + else { + return Ok(None) + }; // make new file client from chunk let DecodedFileChunk { file_client, remaining_bytes, highest_block } = @@ -572,6 +653,7 @@ mod tests { test_utils::{generate_bodies, generate_bodies_file}, }; use assert_matches::assert_matches; + use async_compression::tokio::write::GzipEncoder; use futures_util::stream::StreamExt; use rand::Rng; use reth_consensus::{noop::NoopConsensus, test_utils::TestConsensus}; @@ -582,6 +664,10 @@ mod tests { }; use reth_provider::test_utils::create_test_provider_factory; use std::sync::Arc; + use tokio::{ + fs::File, + io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom}, + }; #[tokio::test] async fn streams_bodies_from_buffer() { @@ -712,7 +798,8 @@ mod tests { trace!(target: "downloaders::file::test", chunk_byte_len); // init reader - let mut reader = ChunkedFileReader::from_file(file, chunk_byte_len as u64).await.unwrap(); + let mut reader = + ChunkedFileReader::from_file(file, chunk_byte_len as u64, false).await.unwrap(); let mut downloaded_headers: Vec = vec![]; @@ -746,4 +833,79 @@ mod tests { // the first header is not included in the response assert_eq!(headers[1..], downloaded_headers); } + + #[tokio::test] + async fn test_chunk_download_headers_from_gzip_file() { + reth_tracing::init_test_tracing(); + + // Generate some random blocks + let (file, headers, _) = generate_bodies_file(0..=14).await; + + // Create a gzipped version of the file + let gzip_temp_file = tempfile::NamedTempFile::new().unwrap(); + let gzip_path = gzip_temp_file.path().to_owned(); + drop(gzip_temp_file); // Close the file so we can write to it + + // Read original file content first + let mut original_file = file; + original_file.seek(SeekFrom::Start(0)).await.unwrap(); + let mut original_content = Vec::new(); + original_file.read_to_end(&mut original_content).await.unwrap(); + + let mut gzip_file = File::create(&gzip_path).await.unwrap(); + let mut encoder = GzipEncoder::new(&mut gzip_file); + + // Write the original content through the gzip encoder + encoder.write_all(&original_content).await.unwrap(); + encoder.shutdown().await.unwrap(); + drop(gzip_file); + + // Reopen the gzipped file for reading + let gzip_file = File::open(&gzip_path).await.unwrap(); + + // calculate min for chunk byte length range, pick a lower bound that guarantees at least + // one block will be read + let chunk_byte_len = rand::rng().random_range(2000..=10_000); + trace!(target: "downloaders::file::test", chunk_byte_len); + + // init reader with gzip=true + let mut reader = + ChunkedFileReader::from_file(gzip_file, chunk_byte_len as u64, true).await.unwrap(); + + let mut downloaded_headers: Vec = vec![]; + + let mut local_header = headers.first().unwrap().clone(); + + // test + while let Some(client) = + reader.next_chunk::(NoopConsensus::arc(), None).await.unwrap() + { + if client.headers_len() == 0 { + continue; + } + + let sync_target = client.tip_header().expect("tip_header should not be None"); + + let sync_target_hash = sync_target.hash(); + + // construct headers downloader and use first header + let mut header_downloader = ReverseHeadersDownloaderBuilder::default() + .build(Arc::clone(&Arc::new(client)), Arc::new(TestConsensus::default())); + header_downloader.update_local_head(local_header.clone()); + header_downloader.update_sync_target(SyncTarget::Tip(sync_target_hash)); + + // get headers first + let mut downloaded_headers_chunk = header_downloader.next().await.unwrap().unwrap(); + + // export new local header to outer scope + local_header = sync_target; + + // reverse to make sure it's in the right order before comparing + downloaded_headers_chunk.reverse(); + downloaded_headers.extend_from_slice(&downloaded_headers_chunk); + } + + // the first header is not included in the response + assert_eq!(headers[1..], downloaded_headers); + } } diff --git a/crates/optimism/cli/src/commands/import_receipts.rs b/crates/optimism/cli/src/commands/import_receipts.rs index b155bbb9e3..60638779ee 100644 --- a/crates/optimism/cli/src/commands/import_receipts.rs +++ b/crates/optimism/cli/src/commands/import_receipts.rs @@ -309,8 +309,9 @@ mod test { f.flush().await.unwrap(); f.seek(SeekFrom::Start(0)).await.unwrap(); - let reader = - ChunkedFileReader::from_file(f, DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE).await.unwrap(); + let reader = ChunkedFileReader::from_file(f, DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE, false) + .await + .unwrap(); let db = TestStageDB::default(); init_genesis(&db.factory).unwrap();