mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
1 Commits
devnet4
...
segmented-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fdc7495f0c |
@@ -50,6 +50,13 @@ const DOWNLOAD_CACHE_DIR: &str = ".download-cache";
|
||||
/// Maximum number of concurrent archive downloads.
|
||||
const MAX_CONCURRENT_DOWNLOADS: usize = 8;
|
||||
|
||||
/// Default number of parallel segments per file download.
|
||||
const DEFAULT_DOWNLOAD_SEGMENTS: usize = 8;
|
||||
|
||||
/// Minimum file size to use segmented downloads (10 MB).
|
||||
/// Below this threshold, single-connection download is used.
|
||||
const MIN_SEGMENTED_SIZE: u64 = 10 * 1024 * 1024;
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub(crate) enum SelectionPreset {
|
||||
Minimal,
|
||||
@@ -247,6 +254,15 @@ pub struct DownloadCommand<C: ChainSpecParser> {
|
||||
/// Maximum number of concurrent modular archive workers.
|
||||
#[arg(long, default_value_t = MAX_CONCURRENT_DOWNLOADS)]
|
||||
download_concurrency: usize,
|
||||
|
||||
/// Number of parallel segments per file download.
|
||||
///
|
||||
/// Splits each archive into N chunks and downloads them simultaneously
|
||||
/// using HTTP Range requests, similar to aria2c. Falls back to a single
|
||||
/// connection when the server doesn't support Range or the file is small.
|
||||
/// Implies `--resumable` (two-phase download).
|
||||
#[arg(long, default_value_t = DEFAULT_DOWNLOAD_SEGMENTS)]
|
||||
download_segments: usize,
|
||||
}
|
||||
|
||||
impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> DownloadCommand<C> {
|
||||
@@ -256,6 +272,10 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> DownloadCo
|
||||
let data_dir = self.env.datadir.clone().resolve_datadir(chain);
|
||||
fs::create_dir_all(&data_dir)?;
|
||||
|
||||
let segments = self.download_segments.max(1);
|
||||
// Segmented downloads imply resumable (two-phase) mode
|
||||
let resumable = self.resumable || segments > 1;
|
||||
|
||||
// Legacy single-URL mode: download one archive and extract it
|
||||
if let Some(url) = self.url {
|
||||
info!(target: "reth::cli",
|
||||
@@ -264,7 +284,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> DownloadCo
|
||||
"Starting snapshot download and extraction"
|
||||
);
|
||||
|
||||
stream_and_extract(&url, data_dir.data_dir(), None, self.resumable).await?;
|
||||
stream_and_extract(&url, data_dir.data_dir(), None, resumable, segments).await?;
|
||||
info!(target: "reth::cli", "Snapshot downloaded and extracted successfully");
|
||||
|
||||
return Ok(());
|
||||
@@ -334,7 +354,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> DownloadCo
|
||||
.then_with(|| a.archive.file_name.cmp(&b.archive.file_name))
|
||||
});
|
||||
|
||||
let download_cache_dir = if self.resumable {
|
||||
let download_cache_dir = if resumable {
|
||||
let dir = target_dir.join(DOWNLOAD_CACHE_DIR);
|
||||
fs::create_dir_all(&dir)?;
|
||||
Some(dir)
|
||||
@@ -370,7 +390,6 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> DownloadCo
|
||||
|
||||
let target = target_dir.to_path_buf();
|
||||
let cache_dir = download_cache_dir;
|
||||
let resumable = self.resumable;
|
||||
let download_concurrency = self.download_concurrency.max(1);
|
||||
let results: Vec<Result<()>> = stream::iter(all_downloads)
|
||||
.map(|planned| {
|
||||
@@ -378,8 +397,15 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> DownloadCo
|
||||
let cache = cache_dir.clone();
|
||||
let sp = Arc::clone(&shared);
|
||||
async move {
|
||||
process_modular_archive(planned, &dir, cache.as_deref(), Some(sp), resumable)
|
||||
.await?;
|
||||
process_modular_archive(
|
||||
planned,
|
||||
&dir,
|
||||
cache.as_deref(),
|
||||
Some(sp),
|
||||
resumable,
|
||||
segments,
|
||||
)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
})
|
||||
@@ -1221,6 +1247,234 @@ fn resumable_download(
|
||||
.unwrap_or_else(|| eyre::eyre!("Download failed after {} attempts", MAX_DOWNLOAD_RETRIES)))
|
||||
}
|
||||
|
||||
/// Downloads a single range segment of a file.
|
||||
///
|
||||
/// Writes to `part_path`, resuming from any existing bytes. Reports progress
|
||||
/// to `shared` when provided. Requires the server to respond with `206 Partial
|
||||
/// Content`; returns an error on a full `200 OK` so the caller can fall back.
|
||||
fn download_segment(
|
||||
url: &str,
|
||||
part_path: &Path,
|
||||
range_start: u64,
|
||||
range_end: u64,
|
||||
shared: Option<&Arc<SharedProgress>>,
|
||||
) -> Result<()> {
|
||||
let expected_size = range_end - range_start + 1;
|
||||
let client = BlockingClient::builder().connect_timeout(Duration::from_secs(30)).build()?;
|
||||
|
||||
for attempt in 1..=MAX_DOWNLOAD_RETRIES {
|
||||
let existing = std::fs::metadata(part_path).map(|m| m.len()).unwrap_or(0);
|
||||
|
||||
// Exact match only — stale oversized files are deleted
|
||||
if existing == expected_size {
|
||||
return Ok(());
|
||||
}
|
||||
if existing > expected_size {
|
||||
let _ = std::fs::remove_file(part_path);
|
||||
}
|
||||
|
||||
let existing = std::fs::metadata(part_path).map(|m| m.len()).unwrap_or(0);
|
||||
let current_start = range_start + existing;
|
||||
let range_header = format!("bytes={current_start}-{range_end}");
|
||||
|
||||
let response = match client.get(url).header(RANGE, &range_header).send() {
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
if attempt < MAX_DOWNLOAD_RETRIES {
|
||||
std::thread::sleep(Duration::from_secs(RETRY_BACKOFF_SECS));
|
||||
continue;
|
||||
}
|
||||
return Err(e.into());
|
||||
}
|
||||
};
|
||||
|
||||
// Server must honour the Range request
|
||||
if response.status() != StatusCode::PARTIAL_CONTENT {
|
||||
eyre::bail!(
|
||||
"server did not honour Range request (got {}), segmented download not supported",
|
||||
response.status()
|
||||
);
|
||||
}
|
||||
|
||||
let file = if existing > 0 {
|
||||
OpenOptions::new()
|
||||
.append(true)
|
||||
.open(part_path)
|
||||
.map_err(|e| fs::FsPathError::open(e, part_path))?
|
||||
} else {
|
||||
fs::create_file(part_path)?
|
||||
};
|
||||
|
||||
let copy_result;
|
||||
let flush_result;
|
||||
|
||||
if let Some(sp) = shared {
|
||||
let mut writer =
|
||||
SharedProgressWriter { inner: BufWriter::new(file), progress: Arc::clone(sp) };
|
||||
copy_result = io::copy(&mut { response }, &mut writer);
|
||||
flush_result = writer.inner.flush();
|
||||
} else {
|
||||
let mut writer = BufWriter::new(file);
|
||||
copy_result = io::copy(&mut { response }, &mut writer);
|
||||
flush_result = writer.flush();
|
||||
}
|
||||
|
||||
match copy_result.and(flush_result) {
|
||||
Ok(_) => {
|
||||
let final_len = std::fs::metadata(part_path).map(|m| m.len()).unwrap_or(0);
|
||||
if final_len == expected_size {
|
||||
return Ok(());
|
||||
}
|
||||
// Short write — delete and retry
|
||||
let _ = std::fs::remove_file(part_path);
|
||||
if attempt < MAX_DOWNLOAD_RETRIES {
|
||||
std::thread::sleep(Duration::from_secs(RETRY_BACKOFF_SECS));
|
||||
continue;
|
||||
}
|
||||
eyre::bail!(
|
||||
"segment size mismatch after download: got {final_len}, expected {expected_size}"
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
if attempt < MAX_DOWNLOAD_RETRIES {
|
||||
std::thread::sleep(Duration::from_secs(RETRY_BACKOFF_SECS));
|
||||
continue;
|
||||
}
|
||||
return Err(e.into());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
eyre::bail!("Segment download failed after {} attempts", MAX_DOWNLOAD_RETRIES)
|
||||
}
|
||||
|
||||
/// Downloads a file using multiple parallel connections with HTTP Range requests.
|
||||
///
|
||||
/// Similar to aria2c's multi-connection download. Splits the file into `segments`
|
||||
/// chunks and downloads each in parallel. Falls back to [`resumable_download`] when
|
||||
/// the server doesn't advertise `Accept-Ranges: bytes` or the file is smaller than
|
||||
/// [`MIN_SEGMENTED_SIZE`].
|
||||
fn segmented_download(
|
||||
url: &str,
|
||||
target_dir: &Path,
|
||||
segments: usize,
|
||||
shared: Option<&Arc<SharedProgress>>,
|
||||
) -> Result<(PathBuf, u64)> {
|
||||
let file_name = Url::parse(url)
|
||||
.ok()
|
||||
.and_then(|u| u.path_segments()?.next_back().map(|s| s.to_string()))
|
||||
.unwrap_or_else(|| "snapshot.tar".to_string());
|
||||
|
||||
let final_path = target_dir.join(&file_name);
|
||||
let quiet = shared.is_some();
|
||||
|
||||
// HEAD request to check Content-Length and Range support
|
||||
let client = BlockingClient::builder().timeout(Duration::from_secs(30)).build()?;
|
||||
let head_resp = match client.head(url).send().and_then(|r| r.error_for_status()) {
|
||||
Ok(r) => r,
|
||||
Err(_) => return resumable_download(url, target_dir, shared),
|
||||
};
|
||||
|
||||
let total_size = match head_resp.content_length() {
|
||||
Some(s) => s,
|
||||
None => return resumable_download(url, target_dir, shared),
|
||||
};
|
||||
|
||||
let accepts_ranges = head_resp
|
||||
.headers()
|
||||
.get("Accept-Ranges")
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.is_some_and(|v| v.contains("bytes"));
|
||||
|
||||
if !accepts_ranges || total_size < MIN_SEGMENTED_SIZE || segments <= 1 {
|
||||
return resumable_download(url, target_dir, shared);
|
||||
}
|
||||
|
||||
// Clamp so each segment is at least 1 byte
|
||||
let segment_count = segments.min(total_size as usize);
|
||||
let segment_size = total_size / segment_count as u64;
|
||||
if segment_size == 0 {
|
||||
return resumable_download(url, target_dir, shared);
|
||||
}
|
||||
|
||||
if !quiet {
|
||||
info!(target: "reth::cli",
|
||||
file = %file_name,
|
||||
size = %DownloadProgress::format_size(total_size),
|
||||
segments = segment_count,
|
||||
"Segmented download"
|
||||
);
|
||||
}
|
||||
|
||||
// Count already-downloaded bytes for shared progress
|
||||
if let Some(sp) = shared {
|
||||
let mut already = 0u64;
|
||||
for i in 0..segment_count {
|
||||
let part = target_dir.join(format!("{file_name}.part.{i}"));
|
||||
already += std::fs::metadata(&part).map(|m| m.len()).unwrap_or(0);
|
||||
}
|
||||
if already > 0 {
|
||||
sp.add(already);
|
||||
}
|
||||
}
|
||||
|
||||
// Download all segments in parallel
|
||||
std::thread::scope(|scope| {
|
||||
let mut handles = Vec::with_capacity(segment_count);
|
||||
|
||||
for i in 0..segment_count {
|
||||
let start = i as u64 * segment_size;
|
||||
let end = if i == segment_count - 1 {
|
||||
total_size - 1
|
||||
} else {
|
||||
(i as u64 + 1) * segment_size - 1
|
||||
};
|
||||
|
||||
let part_path = target_dir.join(format!("{file_name}.part.{i}"));
|
||||
let url = url.to_string();
|
||||
let shared = shared.map(Arc::clone);
|
||||
|
||||
handles
|
||||
.push(scope.spawn(move || {
|
||||
download_segment(&url, &part_path, start, end, shared.as_ref())
|
||||
}));
|
||||
}
|
||||
|
||||
for handle in handles {
|
||||
handle.join().map_err(|_| eyre::eyre!("Segment download thread panicked"))??;
|
||||
}
|
||||
|
||||
Ok::<_, eyre::Error>(())
|
||||
})?;
|
||||
|
||||
// Concatenate segments into final file
|
||||
{
|
||||
let mut final_file = BufWriter::new(fs::create_file(&final_path)?);
|
||||
for i in 0..segment_count {
|
||||
let part_path = target_dir.join(format!("{file_name}.part.{i}"));
|
||||
let mut part = fs::open(&part_path)?;
|
||||
io::copy(&mut part, &mut final_file)?;
|
||||
let _ = std::fs::remove_file(&part_path);
|
||||
}
|
||||
final_file.flush()?;
|
||||
}
|
||||
|
||||
// Validate final file size
|
||||
let final_len = std::fs::metadata(&final_path).map(|m| m.len()).unwrap_or(0);
|
||||
if final_len != total_size {
|
||||
let _ = std::fs::remove_file(&final_path);
|
||||
eyre::bail!(
|
||||
"final file size mismatch after concat: got {final_len}, expected {total_size}"
|
||||
);
|
||||
}
|
||||
|
||||
if !quiet {
|
||||
info!(target: "reth::cli", file = %file_name, "Segmented download complete");
|
||||
}
|
||||
|
||||
Ok((final_path, total_size))
|
||||
}
|
||||
|
||||
/// Streams a remote archive directly into the extractor without writing to disk.
|
||||
///
|
||||
/// On failure, retries from scratch up to [`MAX_DOWNLOAD_RETRIES`] times.
|
||||
@@ -1288,14 +1542,22 @@ fn streaming_download_and_extract(
|
||||
}
|
||||
|
||||
/// Fetches the snapshot from a remote URL with resume support, then extracts it.
|
||||
///
|
||||
/// When `segments > 1`, uses [`segmented_download`] to download via multiple
|
||||
/// parallel connections; otherwise falls back to [`resumable_download`].
|
||||
fn download_and_extract(
|
||||
url: &str,
|
||||
format: CompressionFormat,
|
||||
target_dir: &Path,
|
||||
shared: Option<&Arc<SharedProgress>>,
|
||||
segments: usize,
|
||||
) -> Result<()> {
|
||||
let quiet = shared.is_some();
|
||||
let (downloaded_path, total_size) = resumable_download(url, target_dir, shared)?;
|
||||
let (downloaded_path, total_size) = if segments > 1 {
|
||||
segmented_download(url, target_dir, segments, shared)?
|
||||
} else {
|
||||
resumable_download(url, target_dir, shared)?
|
||||
};
|
||||
|
||||
let file_name =
|
||||
downloaded_path.file_name().map(|f| f.to_string_lossy().to_string()).unwrap_or_default();
|
||||
@@ -1339,6 +1601,7 @@ fn blocking_download_and_extract(
|
||||
target_dir: &Path,
|
||||
shared: Option<Arc<SharedProgress>>,
|
||||
resumable: bool,
|
||||
segments: usize,
|
||||
) -> Result<()> {
|
||||
let format = CompressionFormat::from_url(url)?;
|
||||
|
||||
@@ -1356,7 +1619,7 @@ fn blocking_download_and_extract(
|
||||
}
|
||||
result
|
||||
} else if resumable {
|
||||
download_and_extract(url, format, target_dir, shared.as_ref())
|
||||
download_and_extract(url, format, target_dir, shared.as_ref(), segments)
|
||||
} else {
|
||||
let result = streaming_download_and_extract(url, format, target_dir, shared.as_ref());
|
||||
if result.is_ok() &&
|
||||
@@ -1378,11 +1641,12 @@ async fn stream_and_extract(
|
||||
target_dir: &Path,
|
||||
shared: Option<Arc<SharedProgress>>,
|
||||
resumable: bool,
|
||||
segments: usize,
|
||||
) -> Result<()> {
|
||||
let target_dir = target_dir.to_path_buf();
|
||||
let url = url.to_string();
|
||||
task::spawn_blocking(move || {
|
||||
blocking_download_and_extract(&url, &target_dir, shared, resumable)
|
||||
blocking_download_and_extract(&url, &target_dir, shared, resumable, segments)
|
||||
})
|
||||
.await??;
|
||||
|
||||
@@ -1395,6 +1659,7 @@ async fn process_modular_archive(
|
||||
cache_dir: Option<&Path>,
|
||||
shared: Option<Arc<SharedProgress>>,
|
||||
resumable: bool,
|
||||
segments: usize,
|
||||
) -> Result<()> {
|
||||
let target_dir = target_dir.to_path_buf();
|
||||
let cache_dir = cache_dir.map(Path::to_path_buf);
|
||||
@@ -1406,6 +1671,7 @@ async fn process_modular_archive(
|
||||
cache_dir.as_deref(),
|
||||
shared,
|
||||
resumable,
|
||||
segments,
|
||||
)
|
||||
})
|
||||
.await??;
|
||||
@@ -1419,6 +1685,7 @@ fn blocking_process_modular_archive(
|
||||
cache_dir: Option<&Path>,
|
||||
shared: Option<Arc<SharedProgress>>,
|
||||
resumable: bool,
|
||||
segments: usize,
|
||||
) -> Result<()> {
|
||||
let archive = &planned.archive;
|
||||
if verify_output_files(target_dir, &archive.output_files)? {
|
||||
@@ -1438,8 +1705,11 @@ fn blocking_process_modular_archive(
|
||||
let cache_dir = cache_dir.ok_or_else(|| eyre::eyre!("Missing cache directory"))?;
|
||||
let archive_path = cache_dir.join(&archive.file_name);
|
||||
let part_path = cache_dir.join(format!("{}.part", archive.file_name));
|
||||
let (downloaded_path, _downloaded_size) =
|
||||
resumable_download(&archive.url, cache_dir, shared.as_ref())?;
|
||||
let (downloaded_path, _downloaded_size) = if segments > 1 {
|
||||
segmented_download(&archive.url, cache_dir, segments, shared.as_ref())?
|
||||
} else {
|
||||
resumable_download(&archive.url, cache_dir, shared.as_ref())?
|
||||
};
|
||||
let file = fs::open(&downloaded_path)?;
|
||||
extract_archive_raw(file, format, target_dir)?;
|
||||
let _ = fs::remove_file(&archive_path);
|
||||
|
||||
Reference in New Issue
Block a user