Compare commits

...

2 Commits

Author SHA1 Message Date
Sergei Shulepov
8fd8730abf safe no sync 2026-01-14 15:33:11 +00:00
Sergei Shulepov
882139df2c feat(cli): support file:// URLs in reth download
Add support for extracting snapshots from local files using file:// URLs.
This is useful for users who have already downloaded snapshot archives
and want to extract them without re-downloading.

The implementation refactors the extraction logic into separate functions:
- extract_archive: common decompression and unpacking with progress
- extract_from_file: handles local file:// URLs
- download_and_extract: handles remote HTTP(S) URLs
2026-01-14 11:44:55 +00:00
6 changed files with 218 additions and 19 deletions

View File

@@ -86,6 +86,9 @@ impl DownloadDefaults {
"\nIf no URL is provided, the latest mainnet archive snapshot\nwill be proposed for download from ",
);
help.push_str(self.default_base_url.as_ref());
help.push_str(
".\n\nLocal file:// URLs are also supported for extracting snapshots from disk.",
);
help
}
@@ -293,19 +296,14 @@ impl CompressionFormat {
}
}
/// Downloads and extracts a snapshot, blocking until finished.
fn blocking_download_and_extract(url: &str, target_dir: &Path) -> Result<()> {
let client = reqwest::blocking::Client::builder().build()?;
let response = client.get(url).send()?.error_for_status()?;
let total_size = response.content_length().ok_or_else(|| {
eyre::eyre!(
"Server did not provide Content-Length header. This is required for snapshot downloads"
)
})?;
let progress_reader = ProgressReader::new(response, total_size);
let format = CompressionFormat::from_url(url)?;
/// Extracts a compressed tar archive to the target directory with progress tracking.
fn extract_archive<R: Read>(
reader: R,
total_size: u64,
format: CompressionFormat,
target_dir: &Path,
) -> Result<()> {
let progress_reader = ProgressReader::new(reader, total_size);
match format {
CompressionFormat::Lz4 => {
@@ -322,6 +320,45 @@ fn blocking_download_and_extract(url: &str, target_dir: &Path) -> Result<()> {
Ok(())
}
/// Extracts a snapshot from a local file.
fn extract_from_file(path: &Path, format: CompressionFormat, target_dir: &Path) -> Result<()> {
let file = std::fs::File::open(path)?;
let total_size = file.metadata()?.len();
extract_archive(file, total_size, format, target_dir)
}
/// Fetches the snapshot from a remote URL, uncompressing it in a streaming fashion.
fn download_and_extract(url: &str, format: CompressionFormat, target_dir: &Path) -> Result<()> {
let client = reqwest::blocking::Client::builder().build()?;
let response = client.get(url).send()?.error_for_status()?;
let total_size = response.content_length().ok_or_else(|| {
eyre::eyre!(
"Server did not provide Content-Length header. This is required for snapshot downloads"
)
})?;
extract_archive(response, total_size, format, target_dir)
}
/// Downloads and extracts a snapshot, blocking until finished.
///
/// Supports both `file://` URLs for local files and HTTP(S) URLs for remote downloads.
fn blocking_download_and_extract(url: &str, target_dir: &Path) -> Result<()> {
let format = CompressionFormat::from_url(url)?;
if let Ok(parsed_url) = Url::parse(url) &&
parsed_url.scheme() == "file"
{
let file_path = parsed_url
.to_file_path()
.map_err(|_| eyre::eyre!("Invalid file:// URL path: {}", url))?;
extract_from_file(&file_path, format, target_dir)
} else {
download_and_extract(url, format, target_dir)
}
}
async fn stream_and_extract(url: &str, target_dir: &Path) -> Result<()> {
let target_dir = target_dir.to_path_buf();
let url = url.to_string();
@@ -380,6 +417,7 @@ mod tests {
assert!(help.contains("Available snapshot sources:"));
assert!(help.contains("merkle.io"));
assert!(help.contains("publicnode.com"));
assert!(help.contains("file://"));
}
#[test]
@@ -404,4 +442,25 @@ mod tests {
assert_eq!(defaults.available_snapshots.len(), 4); // 2 defaults + 2 added
assert_eq!(defaults.long_help, Some("Custom help for snapshots".to_string()));
}
#[test]
fn test_compression_format_detection() {
assert!(matches!(
CompressionFormat::from_url("https://example.com/snapshot.tar.lz4"),
Ok(CompressionFormat::Lz4)
));
assert!(matches!(
CompressionFormat::from_url("https://example.com/snapshot.tar.zst"),
Ok(CompressionFormat::Zstd)
));
assert!(matches!(
CompressionFormat::from_url("file:///path/to/snapshot.tar.lz4"),
Ok(CompressionFormat::Lz4)
));
assert!(matches!(
CompressionFormat::from_url("file:///path/to/snapshot.tar.zst"),
Ok(CompressionFormat::Zstd)
));
assert!(CompressionFormat::from_url("https://example.com/snapshot.tar.gz").is_err());
}
}

View File

@@ -88,7 +88,9 @@ impl DatabaseArgs {
.with_geometry_page_size(self.page_size)
.with_growth_step(self.growth_step)
.with_max_readers(self.max_readers)
.with_sync_mode(self.sync_mode)
// TODO: temporary for testing - remove before merging
//.with_sync_mode(Some(SyncMode::SafeNoSync))
//.with_periodic_sync_interval(Some(Duration::from_millis(500)))
}
}

View File

@@ -27,7 +27,7 @@ use std::{
ops::{Deref, Range},
path::Path,
sync::Arc,
time::{SystemTime, UNIX_EPOCH},
time::{Duration, SystemTime, UNIX_EPOCH},
};
use tx::Tx;
@@ -119,6 +119,24 @@ pub struct DatabaseArguments {
/// environments). Choose `SafeNoSync` if performance is more important and occasional data
/// loss is acceptable (e.g., testing or ephemeral data).
sync_mode: SyncMode,
/// The threshold in bytes for flushing data buffers to disk when [`SyncMode::SafeNoSync`] is
/// used.
///
/// This sets the interprocess/shared threshold to force flush the data buffers to disk.
/// When the amount of unsynced data exceeds this threshold, MDBX will automatically
/// flush to disk. If [`None`], the MDBX default is used.
sync_bytes: Option<usize>,
/// The interval for periodic flushing of data buffers to disk when [`SyncMode::SafeNoSync`]
/// is used.
///
/// This sets the interprocess/shared relative period since the last unsteady commit to force
/// flush the data buffers to disk. If [`None`], the MDBX default is used.
sync_period: Option<Duration>,
/// Interval for a background thread to call `mdbx_env_sync_ex` to flush data buffers to disk.
///
/// This is useful when using [`SyncMode::SafeNoSync`] to ensure data is persisted at regular
/// intervals. If [`None`], no background sync thread is spawned.
periodic_sync_interval: Option<Duration>,
}
impl Default for DatabaseArguments {
@@ -143,6 +161,9 @@ impl DatabaseArguments {
exclusive: None,
max_readers: None,
sync_mode: SyncMode::Durable,
sync_bytes: None,
sync_period: None,
periodic_sync_interval: None,
}
}
@@ -215,6 +236,36 @@ impl DatabaseArguments {
self
}
/// Sets the threshold in bytes for flushing data buffers to disk.
///
/// This only takes effect when [`SyncMode::SafeNoSync`] is used. When the amount of
/// unsynced data exceeds this threshold, MDBX will automatically flush to disk.
pub const fn with_sync_bytes(mut self, sync_bytes: Option<usize>) -> Self {
self.sync_bytes = sync_bytes;
self
}
/// Sets the interval for periodic flushing of data buffers to disk.
///
/// This only takes effect when [`SyncMode::SafeNoSync`] is used. Sets the relative
/// period since the last unsteady commit to force flush the data buffers to disk.
pub const fn with_sync_period(mut self, sync_period: Option<Duration>) -> Self {
self.sync_period = sync_period;
self
}
/// Sets the interval for a background thread to call `mdbx_env_sync_ex`.
///
/// This is useful when using [`SyncMode::SafeNoSync`] to ensure data is persisted at
/// regular intervals without waiting for the OS to flush.
pub const fn with_periodic_sync_interval(
mut self,
periodic_sync_interval: Option<Duration>,
) -> Self {
self.periodic_sync_interval = periodic_sync_interval;
self
}
/// Returns the client version if any.
pub const fn client_version(&self) -> &ClientVersion {
&self.client_version
@@ -487,6 +538,18 @@ impl DatabaseEnv {
inner_env.set_max_read_transaction_duration(max_read_transaction_duration);
}
if let Some(sync_bytes) = args.sync_bytes {
inner_env.set_sync_bytes(sync_bytes);
}
if let Some(sync_period) = args.sync_period {
inner_env.set_sync_period(sync_period);
}
if let Some(interval) = args.periodic_sync_interval {
inner_env.set_periodic_sync_interval(interval);
}
let env = Self {
inner: inner_env.open(path).map_err(|e| DatabaseError::Open(e.into()))?,
dbis: Arc::default(),

View File

@@ -53,6 +53,7 @@ impl Environment {
log_level: None,
kind: Default::default(),
handle_slow_readers: None,
periodic_sync_interval: None,
#[cfg(feature = "read-tx-timeouts")]
max_read_transaction_duration: None,
}
@@ -598,6 +599,9 @@ pub struct EnvironmentBuilder {
log_level: Option<ffi::MDBX_log_level_t>,
kind: EnvironmentKind,
handle_slow_readers: Option<HandleSlowReadersCallback>,
/// Interval for periodic sync operations. If set, a background thread will call
/// `mdbx_env_sync_ex` at this interval to flush data buffers to disk.
periodic_sync_interval: Option<Duration>,
#[cfg(feature = "read-tx-timeouts")]
/// The maximum duration of a read transaction. If [None], but the `read-tx-timeout` feature is
/// enabled, the default value of [`DEFAULT_MAX_READ_TRANSACTION_DURATION`] is used.
@@ -734,7 +738,7 @@ impl EnvironmentBuilder {
let env_ptr = EnvPtr(env);
#[cfg(not(feature = "read-tx-timeouts"))]
let txn_manager = TxnManager::new(env_ptr);
let txn_manager = TxnManager::new_with_periodic_sync(env_ptr, self.periodic_sync_interval);
#[cfg(feature = "read-tx-timeouts")]
let txn_manager = {
@@ -744,9 +748,13 @@ impl EnvironmentBuilder {
DEFAULT_MAX_READ_TRANSACTION_DURATION,
))
{
TxnManager::new_with_max_read_transaction_duration(env_ptr, duration)
TxnManager::new_with_max_read_transaction_duration(
env_ptr,
duration,
self.periodic_sync_interval,
)
} else {
TxnManager::new(env_ptr)
TxnManager::new_with_periodic_sync(env_ptr, self.periodic_sync_interval)
}
};
@@ -873,6 +881,16 @@ impl EnvironmentBuilder {
self.handle_slow_readers = Some(hsr);
self
}
/// Sets the interval for periodic sync operations.
///
/// If set, a background thread will call `mdbx_env_sync_ex` at this interval to flush
/// data buffers to disk. This is useful when using [`SyncMode::SafeNoSync`] to ensure
/// data is persisted at regular intervals without waiting for the OS to flush.
pub const fn set_periodic_sync_interval(&mut self, interval: Duration) -> &mut Self {
self.periodic_sync_interval = Some(interval);
self
}
}
#[cfg(feature = "read-tx-timeouts")]

View File

@@ -6,6 +6,7 @@ use crate::{
use std::{
ptr,
sync::mpsc::{sync_channel, Receiver, SyncSender},
time::Duration,
};
#[derive(Copy, Clone, Debug)]
@@ -24,6 +25,7 @@ pub(crate) enum TxnManagerMessage {
/// corresponding [`TxnManagerMessage`]
/// - Aborting long-lived read transactions (if the `read-tx-timeouts` feature is enabled and
/// `TxnManager::with_max_read_transaction_duration` is called)
/// - Periodically syncing the environment to disk (if configured with a sync interval)
#[derive(Debug)]
pub(crate) struct TxnManager {
sender: SyncSender<TxnManagerMessage>,
@@ -32,7 +34,12 @@ pub(crate) struct TxnManager {
}
impl TxnManager {
#[allow(dead_code)]
pub(crate) fn new(env: EnvPtr) -> Self {
Self::new_with_periodic_sync(env, None)
}
pub(crate) fn new_with_periodic_sync(env: EnvPtr, sync_interval: Option<Duration>) -> Self {
let (tx, rx) = sync_channel(0);
let txn_manager = Self {
sender: tx,
@@ -42,9 +49,45 @@ impl TxnManager {
txn_manager.start_message_listener(env, rx);
if let Some(interval) = sync_interval {
Self::start_sync_thread(env, interval);
}
txn_manager
}
/// Spawns a background thread that periodically calls `mdbx_env_sync_ex` to flush
/// data buffers to disk.
///
/// This is useful when using [`crate::SyncMode::SafeNoSync`] to ensure data is
/// persisted at regular intervals without waiting for the OS to flush.
fn start_sync_thread(env: EnvPtr, interval: Duration) {
let task = move || {
// Capture the EnvPtr (which is Send) into the closure
let env = env;
loop {
std::thread::sleep(interval);
let _span = tracing::debug_span!(target: "libmdbx::sync", "sync").entered();
// SAFETY: The env pointer is valid for the lifetime of the Environment,
// and this thread will terminate when the process exits.
let result = unsafe { ffi::mdbx_env_sync_ex(env.0, false, false) };
if result != ffi::MDBX_SUCCESS && result != ffi::MDBX_RESULT_TRUE {
tracing::warn!(
target: "libmdbx::sync",
error_code = result,
"Periodic sync failed"
);
}
}
};
std::thread::Builder::new()
.name("mdbx-rs-sync".to_string())
.spawn(task)
.expect("failed to spawn mdbx sync thread");
}
/// Spawns a new [`std::thread`] that listens to incoming [`TxnManagerMessage`] messages,
/// executes an FFI function, and returns the result on the provided channel.
///
@@ -58,6 +101,9 @@ impl TxnManager {
match rx.recv() {
Ok(msg) => match msg {
TxnManagerMessage::Begin { parent, flags, sender } => {
let _span =
tracing::debug_span!(target: "libmdbx::txn", "begin", flags)
.entered();
let mut txn: *mut ffi::MDBX_txn = ptr::null_mut();
let res = mdbx_result(unsafe {
ffi::mdbx_txn_begin_ex(
@@ -72,9 +118,13 @@ impl TxnManager {
sender.send(res).unwrap();
}
TxnManagerMessage::Abort { tx, sender } => {
let _span =
tracing::debug_span!(target: "libmdbx::txn", "abort").entered();
sender.send(mdbx_result(unsafe { ffi::mdbx_txn_abort(tx.0) })).unwrap();
}
TxnManagerMessage::Commit { tx, sender } => {
let _span =
tracing::debug_span!(target: "libmdbx::txn", "commit").entered();
sender
.send({
let mut latency = CommitLatency::new();
@@ -120,6 +170,7 @@ mod read_transactions {
pub(crate) fn new_with_max_read_transaction_duration(
env: EnvPtr,
duration: Duration,
periodic_sync_interval: Option<Duration>,
) -> Self {
let read_transactions = Arc::new(ReadTransactions::new(duration));
read_transactions.clone().start_monitor();
@@ -130,6 +181,10 @@ mod read_transactions {
txn_manager.start_message_listener(env, rx);
if let Some(interval) = periodic_sync_interval {
Self::start_sync_thread(env, interval);
}
txn_manager
}

View File

@@ -137,7 +137,9 @@ Static Files:
- https://publicnode.com/snapshots (full nodes & testnets)
If no URL is provided, the latest mainnet archive snapshot
will be proposed for download from https://downloads.merkle.io
will be proposed for download from https://downloads.merkle.io.
Local file:// URLs are also supported for extracting snapshots from disk.
Logging:
--log.stdout.format <FORMAT>