Compare commits

...

1 Commits

Author SHA1 Message Date
Derek Cofausper
fdc7495f0c feat(download): add aria2c-style segmented parallel downloads
Splits each archive into N chunks and downloads them simultaneously using
HTTP Range requests. Falls back to single-connection when the server does
not support Range or the file is too small (<10 MB).

New flag: --download-segments <N> (default 8). Implies --resumable.

Co-authored-by: zhygis <5236121+Zygimantass@users.noreply.github.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019cc319-d782-731d-aa75-fe7aa86f6e88
2026-03-06 12:51:00 +00:00

View File

@@ -50,6 +50,13 @@ const DOWNLOAD_CACHE_DIR: &str = ".download-cache";
/// Maximum number of concurrent archive downloads.
const MAX_CONCURRENT_DOWNLOADS: usize = 8;
/// Default number of parallel segments per file download.
const DEFAULT_DOWNLOAD_SEGMENTS: usize = 8;
/// Minimum file size to use segmented downloads (10 MB).
/// Below this threshold, single-connection download is used.
const MIN_SEGMENTED_SIZE: u64 = 10 * 1024 * 1024;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum SelectionPreset {
Minimal,
@@ -247,6 +254,15 @@ pub struct DownloadCommand<C: ChainSpecParser> {
/// Maximum number of concurrent modular archive workers.
#[arg(long, default_value_t = MAX_CONCURRENT_DOWNLOADS)]
download_concurrency: usize,
/// Number of parallel segments per file download.
///
/// Splits each archive into N chunks and downloads them simultaneously
/// using HTTP Range requests, similar to aria2c. Falls back to a single
/// connection when the server doesn't support Range or the file is small.
/// Implies `--resumable` (two-phase download).
#[arg(long, default_value_t = DEFAULT_DOWNLOAD_SEGMENTS)]
download_segments: usize,
}
impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> DownloadCommand<C> {
@@ -256,6 +272,10 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> DownloadCo
let data_dir = self.env.datadir.clone().resolve_datadir(chain);
fs::create_dir_all(&data_dir)?;
let segments = self.download_segments.max(1);
// Segmented downloads imply resumable (two-phase) mode
let resumable = self.resumable || segments > 1;
// Legacy single-URL mode: download one archive and extract it
if let Some(url) = self.url {
info!(target: "reth::cli",
@@ -264,7 +284,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> DownloadCo
"Starting snapshot download and extraction"
);
stream_and_extract(&url, data_dir.data_dir(), None, self.resumable).await?;
stream_and_extract(&url, data_dir.data_dir(), None, resumable, segments).await?;
info!(target: "reth::cli", "Snapshot downloaded and extracted successfully");
return Ok(());
@@ -334,7 +354,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> DownloadCo
.then_with(|| a.archive.file_name.cmp(&b.archive.file_name))
});
let download_cache_dir = if self.resumable {
let download_cache_dir = if resumable {
let dir = target_dir.join(DOWNLOAD_CACHE_DIR);
fs::create_dir_all(&dir)?;
Some(dir)
@@ -370,7 +390,6 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> DownloadCo
let target = target_dir.to_path_buf();
let cache_dir = download_cache_dir;
let resumable = self.resumable;
let download_concurrency = self.download_concurrency.max(1);
let results: Vec<Result<()>> = stream::iter(all_downloads)
.map(|planned| {
@@ -378,8 +397,15 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> DownloadCo
let cache = cache_dir.clone();
let sp = Arc::clone(&shared);
async move {
process_modular_archive(planned, &dir, cache.as_deref(), Some(sp), resumable)
.await?;
process_modular_archive(
planned,
&dir,
cache.as_deref(),
Some(sp),
resumable,
segments,
)
.await?;
Ok(())
}
})
@@ -1221,6 +1247,234 @@ fn resumable_download(
.unwrap_or_else(|| eyre::eyre!("Download failed after {} attempts", MAX_DOWNLOAD_RETRIES)))
}
/// Downloads a single range segment of a file.
///
/// Writes to `part_path`, resuming from any existing bytes. Reports progress
/// to `shared` when provided. Requires the server to respond with `206 Partial
/// Content`; returns an error on a full `200 OK` so the caller can fall back.
fn download_segment(
url: &str,
part_path: &Path,
range_start: u64,
range_end: u64,
shared: Option<&Arc<SharedProgress>>,
) -> Result<()> {
let expected_size = range_end - range_start + 1;
let client = BlockingClient::builder().connect_timeout(Duration::from_secs(30)).build()?;
for attempt in 1..=MAX_DOWNLOAD_RETRIES {
let existing = std::fs::metadata(part_path).map(|m| m.len()).unwrap_or(0);
// Exact match only — stale oversized files are deleted
if existing == expected_size {
return Ok(());
}
if existing > expected_size {
let _ = std::fs::remove_file(part_path);
}
let existing = std::fs::metadata(part_path).map(|m| m.len()).unwrap_or(0);
let current_start = range_start + existing;
let range_header = format!("bytes={current_start}-{range_end}");
let response = match client.get(url).header(RANGE, &range_header).send() {
Ok(r) => r,
Err(e) => {
if attempt < MAX_DOWNLOAD_RETRIES {
std::thread::sleep(Duration::from_secs(RETRY_BACKOFF_SECS));
continue;
}
return Err(e.into());
}
};
// Server must honour the Range request
if response.status() != StatusCode::PARTIAL_CONTENT {
eyre::bail!(
"server did not honour Range request (got {}), segmented download not supported",
response.status()
);
}
let file = if existing > 0 {
OpenOptions::new()
.append(true)
.open(part_path)
.map_err(|e| fs::FsPathError::open(e, part_path))?
} else {
fs::create_file(part_path)?
};
let copy_result;
let flush_result;
if let Some(sp) = shared {
let mut writer =
SharedProgressWriter { inner: BufWriter::new(file), progress: Arc::clone(sp) };
copy_result = io::copy(&mut { response }, &mut writer);
flush_result = writer.inner.flush();
} else {
let mut writer = BufWriter::new(file);
copy_result = io::copy(&mut { response }, &mut writer);
flush_result = writer.flush();
}
match copy_result.and(flush_result) {
Ok(_) => {
let final_len = std::fs::metadata(part_path).map(|m| m.len()).unwrap_or(0);
if final_len == expected_size {
return Ok(());
}
// Short write — delete and retry
let _ = std::fs::remove_file(part_path);
if attempt < MAX_DOWNLOAD_RETRIES {
std::thread::sleep(Duration::from_secs(RETRY_BACKOFF_SECS));
continue;
}
eyre::bail!(
"segment size mismatch after download: got {final_len}, expected {expected_size}"
);
}
Err(e) => {
if attempt < MAX_DOWNLOAD_RETRIES {
std::thread::sleep(Duration::from_secs(RETRY_BACKOFF_SECS));
continue;
}
return Err(e.into());
}
}
}
eyre::bail!("Segment download failed after {} attempts", MAX_DOWNLOAD_RETRIES)
}
/// Downloads a file using multiple parallel connections with HTTP Range requests.
///
/// Similar to aria2c's multi-connection download. Splits the file into `segments`
/// chunks and downloads each in parallel. Falls back to [`resumable_download`] when
/// the server doesn't advertise `Accept-Ranges: bytes` or the file is smaller than
/// [`MIN_SEGMENTED_SIZE`].
fn segmented_download(
url: &str,
target_dir: &Path,
segments: usize,
shared: Option<&Arc<SharedProgress>>,
) -> Result<(PathBuf, u64)> {
let file_name = Url::parse(url)
.ok()
.and_then(|u| u.path_segments()?.next_back().map(|s| s.to_string()))
.unwrap_or_else(|| "snapshot.tar".to_string());
let final_path = target_dir.join(&file_name);
let quiet = shared.is_some();
// HEAD request to check Content-Length and Range support
let client = BlockingClient::builder().timeout(Duration::from_secs(30)).build()?;
let head_resp = match client.head(url).send().and_then(|r| r.error_for_status()) {
Ok(r) => r,
Err(_) => return resumable_download(url, target_dir, shared),
};
let total_size = match head_resp.content_length() {
Some(s) => s,
None => return resumable_download(url, target_dir, shared),
};
let accepts_ranges = head_resp
.headers()
.get("Accept-Ranges")
.and_then(|v| v.to_str().ok())
.is_some_and(|v| v.contains("bytes"));
if !accepts_ranges || total_size < MIN_SEGMENTED_SIZE || segments <= 1 {
return resumable_download(url, target_dir, shared);
}
// Clamp so each segment is at least 1 byte
let segment_count = segments.min(total_size as usize);
let segment_size = total_size / segment_count as u64;
if segment_size == 0 {
return resumable_download(url, target_dir, shared);
}
if !quiet {
info!(target: "reth::cli",
file = %file_name,
size = %DownloadProgress::format_size(total_size),
segments = segment_count,
"Segmented download"
);
}
// Count already-downloaded bytes for shared progress
if let Some(sp) = shared {
let mut already = 0u64;
for i in 0..segment_count {
let part = target_dir.join(format!("{file_name}.part.{i}"));
already += std::fs::metadata(&part).map(|m| m.len()).unwrap_or(0);
}
if already > 0 {
sp.add(already);
}
}
// Download all segments in parallel
std::thread::scope(|scope| {
let mut handles = Vec::with_capacity(segment_count);
for i in 0..segment_count {
let start = i as u64 * segment_size;
let end = if i == segment_count - 1 {
total_size - 1
} else {
(i as u64 + 1) * segment_size - 1
};
let part_path = target_dir.join(format!("{file_name}.part.{i}"));
let url = url.to_string();
let shared = shared.map(Arc::clone);
handles
.push(scope.spawn(move || {
download_segment(&url, &part_path, start, end, shared.as_ref())
}));
}
for handle in handles {
handle.join().map_err(|_| eyre::eyre!("Segment download thread panicked"))??;
}
Ok::<_, eyre::Error>(())
})?;
// Concatenate segments into final file
{
let mut final_file = BufWriter::new(fs::create_file(&final_path)?);
for i in 0..segment_count {
let part_path = target_dir.join(format!("{file_name}.part.{i}"));
let mut part = fs::open(&part_path)?;
io::copy(&mut part, &mut final_file)?;
let _ = std::fs::remove_file(&part_path);
}
final_file.flush()?;
}
// Validate final file size
let final_len = std::fs::metadata(&final_path).map(|m| m.len()).unwrap_or(0);
if final_len != total_size {
let _ = std::fs::remove_file(&final_path);
eyre::bail!(
"final file size mismatch after concat: got {final_len}, expected {total_size}"
);
}
if !quiet {
info!(target: "reth::cli", file = %file_name, "Segmented download complete");
}
Ok((final_path, total_size))
}
/// Streams a remote archive directly into the extractor without writing to disk.
///
/// On failure, retries from scratch up to [`MAX_DOWNLOAD_RETRIES`] times.
@@ -1288,14 +1542,22 @@ fn streaming_download_and_extract(
}
/// Fetches the snapshot from a remote URL with resume support, then extracts it.
///
/// When `segments > 1`, uses [`segmented_download`] to download via multiple
/// parallel connections; otherwise falls back to [`resumable_download`].
fn download_and_extract(
url: &str,
format: CompressionFormat,
target_dir: &Path,
shared: Option<&Arc<SharedProgress>>,
segments: usize,
) -> Result<()> {
let quiet = shared.is_some();
let (downloaded_path, total_size) = resumable_download(url, target_dir, shared)?;
let (downloaded_path, total_size) = if segments > 1 {
segmented_download(url, target_dir, segments, shared)?
} else {
resumable_download(url, target_dir, shared)?
};
let file_name =
downloaded_path.file_name().map(|f| f.to_string_lossy().to_string()).unwrap_or_default();
@@ -1339,6 +1601,7 @@ fn blocking_download_and_extract(
target_dir: &Path,
shared: Option<Arc<SharedProgress>>,
resumable: bool,
segments: usize,
) -> Result<()> {
let format = CompressionFormat::from_url(url)?;
@@ -1356,7 +1619,7 @@ fn blocking_download_and_extract(
}
result
} else if resumable {
download_and_extract(url, format, target_dir, shared.as_ref())
download_and_extract(url, format, target_dir, shared.as_ref(), segments)
} else {
let result = streaming_download_and_extract(url, format, target_dir, shared.as_ref());
if result.is_ok() &&
@@ -1378,11 +1641,12 @@ async fn stream_and_extract(
target_dir: &Path,
shared: Option<Arc<SharedProgress>>,
resumable: bool,
segments: usize,
) -> Result<()> {
let target_dir = target_dir.to_path_buf();
let url = url.to_string();
task::spawn_blocking(move || {
blocking_download_and_extract(&url, &target_dir, shared, resumable)
blocking_download_and_extract(&url, &target_dir, shared, resumable, segments)
})
.await??;
@@ -1395,6 +1659,7 @@ async fn process_modular_archive(
cache_dir: Option<&Path>,
shared: Option<Arc<SharedProgress>>,
resumable: bool,
segments: usize,
) -> Result<()> {
let target_dir = target_dir.to_path_buf();
let cache_dir = cache_dir.map(Path::to_path_buf);
@@ -1406,6 +1671,7 @@ async fn process_modular_archive(
cache_dir.as_deref(),
shared,
resumable,
segments,
)
})
.await??;
@@ -1419,6 +1685,7 @@ fn blocking_process_modular_archive(
cache_dir: Option<&Path>,
shared: Option<Arc<SharedProgress>>,
resumable: bool,
segments: usize,
) -> Result<()> {
let archive = &planned.archive;
if verify_output_files(target_dir, &archive.output_files)? {
@@ -1438,8 +1705,11 @@ fn blocking_process_modular_archive(
let cache_dir = cache_dir.ok_or_else(|| eyre::eyre!("Missing cache directory"))?;
let archive_path = cache_dir.join(&archive.file_name);
let part_path = cache_dir.join(format!("{}.part", archive.file_name));
let (downloaded_path, _downloaded_size) =
resumable_download(&archive.url, cache_dir, shared.as_ref())?;
let (downloaded_path, _downloaded_size) = if segments > 1 {
segmented_download(&archive.url, cache_dir, segments, shared.as_ref())?
} else {
resumable_download(&archive.url, cache_dir, shared.as_ref())?
};
let file = fs::open(&downloaded_path)?;
extract_archive_raw(file, format, target_dir)?;
let _ = fs::remove_file(&archive_path);