mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
2 Commits
push
...
pep/safeno
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8fd8730abf | ||
|
|
882139df2c |
@@ -86,6 +86,9 @@ impl DownloadDefaults {
|
||||
"\nIf no URL is provided, the latest mainnet archive snapshot\nwill be proposed for download from ",
|
||||
);
|
||||
help.push_str(self.default_base_url.as_ref());
|
||||
help.push_str(
|
||||
".\n\nLocal file:// URLs are also supported for extracting snapshots from disk.",
|
||||
);
|
||||
help
|
||||
}
|
||||
|
||||
@@ -293,19 +296,14 @@ impl CompressionFormat {
|
||||
}
|
||||
}
|
||||
|
||||
/// Downloads and extracts a snapshot, blocking until finished.
|
||||
fn blocking_download_and_extract(url: &str, target_dir: &Path) -> Result<()> {
|
||||
let client = reqwest::blocking::Client::builder().build()?;
|
||||
let response = client.get(url).send()?.error_for_status()?;
|
||||
|
||||
let total_size = response.content_length().ok_or_else(|| {
|
||||
eyre::eyre!(
|
||||
"Server did not provide Content-Length header. This is required for snapshot downloads"
|
||||
)
|
||||
})?;
|
||||
|
||||
let progress_reader = ProgressReader::new(response, total_size);
|
||||
let format = CompressionFormat::from_url(url)?;
|
||||
/// Extracts a compressed tar archive to the target directory with progress tracking.
|
||||
fn extract_archive<R: Read>(
|
||||
reader: R,
|
||||
total_size: u64,
|
||||
format: CompressionFormat,
|
||||
target_dir: &Path,
|
||||
) -> Result<()> {
|
||||
let progress_reader = ProgressReader::new(reader, total_size);
|
||||
|
||||
match format {
|
||||
CompressionFormat::Lz4 => {
|
||||
@@ -322,6 +320,45 @@ fn blocking_download_and_extract(url: &str, target_dir: &Path) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Extracts a snapshot from a local file.
|
||||
fn extract_from_file(path: &Path, format: CompressionFormat, target_dir: &Path) -> Result<()> {
|
||||
let file = std::fs::File::open(path)?;
|
||||
let total_size = file.metadata()?.len();
|
||||
extract_archive(file, total_size, format, target_dir)
|
||||
}
|
||||
|
||||
/// Fetches the snapshot from a remote URL, uncompressing it in a streaming fashion.
|
||||
fn download_and_extract(url: &str, format: CompressionFormat, target_dir: &Path) -> Result<()> {
|
||||
let client = reqwest::blocking::Client::builder().build()?;
|
||||
let response = client.get(url).send()?.error_for_status()?;
|
||||
|
||||
let total_size = response.content_length().ok_or_else(|| {
|
||||
eyre::eyre!(
|
||||
"Server did not provide Content-Length header. This is required for snapshot downloads"
|
||||
)
|
||||
})?;
|
||||
|
||||
extract_archive(response, total_size, format, target_dir)
|
||||
}
|
||||
|
||||
/// Downloads and extracts a snapshot, blocking until finished.
|
||||
///
|
||||
/// Supports both `file://` URLs for local files and HTTP(S) URLs for remote downloads.
|
||||
fn blocking_download_and_extract(url: &str, target_dir: &Path) -> Result<()> {
|
||||
let format = CompressionFormat::from_url(url)?;
|
||||
|
||||
if let Ok(parsed_url) = Url::parse(url) &&
|
||||
parsed_url.scheme() == "file"
|
||||
{
|
||||
let file_path = parsed_url
|
||||
.to_file_path()
|
||||
.map_err(|_| eyre::eyre!("Invalid file:// URL path: {}", url))?;
|
||||
extract_from_file(&file_path, format, target_dir)
|
||||
} else {
|
||||
download_and_extract(url, format, target_dir)
|
||||
}
|
||||
}
|
||||
|
||||
async fn stream_and_extract(url: &str, target_dir: &Path) -> Result<()> {
|
||||
let target_dir = target_dir.to_path_buf();
|
||||
let url = url.to_string();
|
||||
@@ -380,6 +417,7 @@ mod tests {
|
||||
assert!(help.contains("Available snapshot sources:"));
|
||||
assert!(help.contains("merkle.io"));
|
||||
assert!(help.contains("publicnode.com"));
|
||||
assert!(help.contains("file://"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -404,4 +442,25 @@ mod tests {
|
||||
assert_eq!(defaults.available_snapshots.len(), 4); // 2 defaults + 2 added
|
||||
assert_eq!(defaults.long_help, Some("Custom help for snapshots".to_string()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_compression_format_detection() {
|
||||
assert!(matches!(
|
||||
CompressionFormat::from_url("https://example.com/snapshot.tar.lz4"),
|
||||
Ok(CompressionFormat::Lz4)
|
||||
));
|
||||
assert!(matches!(
|
||||
CompressionFormat::from_url("https://example.com/snapshot.tar.zst"),
|
||||
Ok(CompressionFormat::Zstd)
|
||||
));
|
||||
assert!(matches!(
|
||||
CompressionFormat::from_url("file:///path/to/snapshot.tar.lz4"),
|
||||
Ok(CompressionFormat::Lz4)
|
||||
));
|
||||
assert!(matches!(
|
||||
CompressionFormat::from_url("file:///path/to/snapshot.tar.zst"),
|
||||
Ok(CompressionFormat::Zstd)
|
||||
));
|
||||
assert!(CompressionFormat::from_url("https://example.com/snapshot.tar.gz").is_err());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -88,7 +88,9 @@ impl DatabaseArgs {
|
||||
.with_geometry_page_size(self.page_size)
|
||||
.with_growth_step(self.growth_step)
|
||||
.with_max_readers(self.max_readers)
|
||||
.with_sync_mode(self.sync_mode)
|
||||
// TODO: temporary for testing - remove before merging
|
||||
//.with_sync_mode(Some(SyncMode::SafeNoSync))
|
||||
//.with_periodic_sync_interval(Some(Duration::from_millis(500)))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -27,7 +27,7 @@ use std::{
|
||||
ops::{Deref, Range},
|
||||
path::Path,
|
||||
sync::Arc,
|
||||
time::{SystemTime, UNIX_EPOCH},
|
||||
time::{Duration, SystemTime, UNIX_EPOCH},
|
||||
};
|
||||
use tx::Tx;
|
||||
|
||||
@@ -119,6 +119,24 @@ pub struct DatabaseArguments {
|
||||
/// environments). Choose `SafeNoSync` if performance is more important and occasional data
|
||||
/// loss is acceptable (e.g., testing or ephemeral data).
|
||||
sync_mode: SyncMode,
|
||||
/// The threshold in bytes for flushing data buffers to disk when [`SyncMode::SafeNoSync`] is
|
||||
/// used.
|
||||
///
|
||||
/// This sets the interprocess/shared threshold to force flush the data buffers to disk.
|
||||
/// When the amount of unsynced data exceeds this threshold, MDBX will automatically
|
||||
/// flush to disk. If [`None`], the MDBX default is used.
|
||||
sync_bytes: Option<usize>,
|
||||
/// The interval for periodic flushing of data buffers to disk when [`SyncMode::SafeNoSync`]
|
||||
/// is used.
|
||||
///
|
||||
/// This sets the interprocess/shared relative period since the last unsteady commit to force
|
||||
/// flush the data buffers to disk. If [`None`], the MDBX default is used.
|
||||
sync_period: Option<Duration>,
|
||||
/// Interval for a background thread to call `mdbx_env_sync_ex` to flush data buffers to disk.
|
||||
///
|
||||
/// This is useful when using [`SyncMode::SafeNoSync`] to ensure data is persisted at regular
|
||||
/// intervals. If [`None`], no background sync thread is spawned.
|
||||
periodic_sync_interval: Option<Duration>,
|
||||
}
|
||||
|
||||
impl Default for DatabaseArguments {
|
||||
@@ -143,6 +161,9 @@ impl DatabaseArguments {
|
||||
exclusive: None,
|
||||
max_readers: None,
|
||||
sync_mode: SyncMode::Durable,
|
||||
sync_bytes: None,
|
||||
sync_period: None,
|
||||
periodic_sync_interval: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -215,6 +236,36 @@ impl DatabaseArguments {
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets the threshold in bytes for flushing data buffers to disk.
|
||||
///
|
||||
/// This only takes effect when [`SyncMode::SafeNoSync`] is used. When the amount of
|
||||
/// unsynced data exceeds this threshold, MDBX will automatically flush to disk.
|
||||
pub const fn with_sync_bytes(mut self, sync_bytes: Option<usize>) -> Self {
|
||||
self.sync_bytes = sync_bytes;
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets the interval for periodic flushing of data buffers to disk.
|
||||
///
|
||||
/// This only takes effect when [`SyncMode::SafeNoSync`] is used. Sets the relative
|
||||
/// period since the last unsteady commit to force flush the data buffers to disk.
|
||||
pub const fn with_sync_period(mut self, sync_period: Option<Duration>) -> Self {
|
||||
self.sync_period = sync_period;
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets the interval for a background thread to call `mdbx_env_sync_ex`.
|
||||
///
|
||||
/// This is useful when using [`SyncMode::SafeNoSync`] to ensure data is persisted at
|
||||
/// regular intervals without waiting for the OS to flush.
|
||||
pub const fn with_periodic_sync_interval(
|
||||
mut self,
|
||||
periodic_sync_interval: Option<Duration>,
|
||||
) -> Self {
|
||||
self.periodic_sync_interval = periodic_sync_interval;
|
||||
self
|
||||
}
|
||||
|
||||
/// Returns the client version if any.
|
||||
pub const fn client_version(&self) -> &ClientVersion {
|
||||
&self.client_version
|
||||
@@ -487,6 +538,18 @@ impl DatabaseEnv {
|
||||
inner_env.set_max_read_transaction_duration(max_read_transaction_duration);
|
||||
}
|
||||
|
||||
if let Some(sync_bytes) = args.sync_bytes {
|
||||
inner_env.set_sync_bytes(sync_bytes);
|
||||
}
|
||||
|
||||
if let Some(sync_period) = args.sync_period {
|
||||
inner_env.set_sync_period(sync_period);
|
||||
}
|
||||
|
||||
if let Some(interval) = args.periodic_sync_interval {
|
||||
inner_env.set_periodic_sync_interval(interval);
|
||||
}
|
||||
|
||||
let env = Self {
|
||||
inner: inner_env.open(path).map_err(|e| DatabaseError::Open(e.into()))?,
|
||||
dbis: Arc::default(),
|
||||
|
||||
@@ -53,6 +53,7 @@ impl Environment {
|
||||
log_level: None,
|
||||
kind: Default::default(),
|
||||
handle_slow_readers: None,
|
||||
periodic_sync_interval: None,
|
||||
#[cfg(feature = "read-tx-timeouts")]
|
||||
max_read_transaction_duration: None,
|
||||
}
|
||||
@@ -598,6 +599,9 @@ pub struct EnvironmentBuilder {
|
||||
log_level: Option<ffi::MDBX_log_level_t>,
|
||||
kind: EnvironmentKind,
|
||||
handle_slow_readers: Option<HandleSlowReadersCallback>,
|
||||
/// Interval for periodic sync operations. If set, a background thread will call
|
||||
/// `mdbx_env_sync_ex` at this interval to flush data buffers to disk.
|
||||
periodic_sync_interval: Option<Duration>,
|
||||
#[cfg(feature = "read-tx-timeouts")]
|
||||
/// The maximum duration of a read transaction. If [None], but the `read-tx-timeout` feature is
|
||||
/// enabled, the default value of [`DEFAULT_MAX_READ_TRANSACTION_DURATION`] is used.
|
||||
@@ -734,7 +738,7 @@ impl EnvironmentBuilder {
|
||||
let env_ptr = EnvPtr(env);
|
||||
|
||||
#[cfg(not(feature = "read-tx-timeouts"))]
|
||||
let txn_manager = TxnManager::new(env_ptr);
|
||||
let txn_manager = TxnManager::new_with_periodic_sync(env_ptr, self.periodic_sync_interval);
|
||||
|
||||
#[cfg(feature = "read-tx-timeouts")]
|
||||
let txn_manager = {
|
||||
@@ -744,9 +748,13 @@ impl EnvironmentBuilder {
|
||||
DEFAULT_MAX_READ_TRANSACTION_DURATION,
|
||||
))
|
||||
{
|
||||
TxnManager::new_with_max_read_transaction_duration(env_ptr, duration)
|
||||
TxnManager::new_with_max_read_transaction_duration(
|
||||
env_ptr,
|
||||
duration,
|
||||
self.periodic_sync_interval,
|
||||
)
|
||||
} else {
|
||||
TxnManager::new(env_ptr)
|
||||
TxnManager::new_with_periodic_sync(env_ptr, self.periodic_sync_interval)
|
||||
}
|
||||
};
|
||||
|
||||
@@ -873,6 +881,16 @@ impl EnvironmentBuilder {
|
||||
self.handle_slow_readers = Some(hsr);
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets the interval for periodic sync operations.
|
||||
///
|
||||
/// If set, a background thread will call `mdbx_env_sync_ex` at this interval to flush
|
||||
/// data buffers to disk. This is useful when using [`SyncMode::SafeNoSync`] to ensure
|
||||
/// data is persisted at regular intervals without waiting for the OS to flush.
|
||||
pub const fn set_periodic_sync_interval(&mut self, interval: Duration) -> &mut Self {
|
||||
self.periodic_sync_interval = Some(interval);
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "read-tx-timeouts")]
|
||||
|
||||
@@ -6,6 +6,7 @@ use crate::{
|
||||
use std::{
|
||||
ptr,
|
||||
sync::mpsc::{sync_channel, Receiver, SyncSender},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
@@ -24,6 +25,7 @@ pub(crate) enum TxnManagerMessage {
|
||||
/// corresponding [`TxnManagerMessage`]
|
||||
/// - Aborting long-lived read transactions (if the `read-tx-timeouts` feature is enabled and
|
||||
/// `TxnManager::with_max_read_transaction_duration` is called)
|
||||
/// - Periodically syncing the environment to disk (if configured with a sync interval)
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct TxnManager {
|
||||
sender: SyncSender<TxnManagerMessage>,
|
||||
@@ -32,7 +34,12 @@ pub(crate) struct TxnManager {
|
||||
}
|
||||
|
||||
impl TxnManager {
|
||||
#[allow(dead_code)]
|
||||
pub(crate) fn new(env: EnvPtr) -> Self {
|
||||
Self::new_with_periodic_sync(env, None)
|
||||
}
|
||||
|
||||
pub(crate) fn new_with_periodic_sync(env: EnvPtr, sync_interval: Option<Duration>) -> Self {
|
||||
let (tx, rx) = sync_channel(0);
|
||||
let txn_manager = Self {
|
||||
sender: tx,
|
||||
@@ -42,9 +49,45 @@ impl TxnManager {
|
||||
|
||||
txn_manager.start_message_listener(env, rx);
|
||||
|
||||
if let Some(interval) = sync_interval {
|
||||
Self::start_sync_thread(env, interval);
|
||||
}
|
||||
|
||||
txn_manager
|
||||
}
|
||||
|
||||
/// Spawns a background thread that periodically calls `mdbx_env_sync_ex` to flush
|
||||
/// data buffers to disk.
|
||||
///
|
||||
/// This is useful when using [`crate::SyncMode::SafeNoSync`] to ensure data is
|
||||
/// persisted at regular intervals without waiting for the OS to flush.
|
||||
fn start_sync_thread(env: EnvPtr, interval: Duration) {
|
||||
let task = move || {
|
||||
// Capture the EnvPtr (which is Send) into the closure
|
||||
let env = env;
|
||||
loop {
|
||||
std::thread::sleep(interval);
|
||||
|
||||
let _span = tracing::debug_span!(target: "libmdbx::sync", "sync").entered();
|
||||
|
||||
// SAFETY: The env pointer is valid for the lifetime of the Environment,
|
||||
// and this thread will terminate when the process exits.
|
||||
let result = unsafe { ffi::mdbx_env_sync_ex(env.0, false, false) };
|
||||
if result != ffi::MDBX_SUCCESS && result != ffi::MDBX_RESULT_TRUE {
|
||||
tracing::warn!(
|
||||
target: "libmdbx::sync",
|
||||
error_code = result,
|
||||
"Periodic sync failed"
|
||||
);
|
||||
}
|
||||
}
|
||||
};
|
||||
std::thread::Builder::new()
|
||||
.name("mdbx-rs-sync".to_string())
|
||||
.spawn(task)
|
||||
.expect("failed to spawn mdbx sync thread");
|
||||
}
|
||||
|
||||
/// Spawns a new [`std::thread`] that listens to incoming [`TxnManagerMessage`] messages,
|
||||
/// executes an FFI function, and returns the result on the provided channel.
|
||||
///
|
||||
@@ -58,6 +101,9 @@ impl TxnManager {
|
||||
match rx.recv() {
|
||||
Ok(msg) => match msg {
|
||||
TxnManagerMessage::Begin { parent, flags, sender } => {
|
||||
let _span =
|
||||
tracing::debug_span!(target: "libmdbx::txn", "begin", flags)
|
||||
.entered();
|
||||
let mut txn: *mut ffi::MDBX_txn = ptr::null_mut();
|
||||
let res = mdbx_result(unsafe {
|
||||
ffi::mdbx_txn_begin_ex(
|
||||
@@ -72,9 +118,13 @@ impl TxnManager {
|
||||
sender.send(res).unwrap();
|
||||
}
|
||||
TxnManagerMessage::Abort { tx, sender } => {
|
||||
let _span =
|
||||
tracing::debug_span!(target: "libmdbx::txn", "abort").entered();
|
||||
sender.send(mdbx_result(unsafe { ffi::mdbx_txn_abort(tx.0) })).unwrap();
|
||||
}
|
||||
TxnManagerMessage::Commit { tx, sender } => {
|
||||
let _span =
|
||||
tracing::debug_span!(target: "libmdbx::txn", "commit").entered();
|
||||
sender
|
||||
.send({
|
||||
let mut latency = CommitLatency::new();
|
||||
@@ -120,6 +170,7 @@ mod read_transactions {
|
||||
pub(crate) fn new_with_max_read_transaction_duration(
|
||||
env: EnvPtr,
|
||||
duration: Duration,
|
||||
periodic_sync_interval: Option<Duration>,
|
||||
) -> Self {
|
||||
let read_transactions = Arc::new(ReadTransactions::new(duration));
|
||||
read_transactions.clone().start_monitor();
|
||||
@@ -130,6 +181,10 @@ mod read_transactions {
|
||||
|
||||
txn_manager.start_message_listener(env, rx);
|
||||
|
||||
if let Some(interval) = periodic_sync_interval {
|
||||
Self::start_sync_thread(env, interval);
|
||||
}
|
||||
|
||||
txn_manager
|
||||
}
|
||||
|
||||
|
||||
@@ -137,7 +137,9 @@ Static Files:
|
||||
- https://publicnode.com/snapshots (full nodes & testnets)
|
||||
|
||||
If no URL is provided, the latest mainnet archive snapshot
|
||||
will be proposed for download from https://downloads.merkle.io
|
||||
will be proposed for download from https://downloads.merkle.io.
|
||||
|
||||
Local file:// URLs are also supported for extracting snapshots from disk.
|
||||
|
||||
Logging:
|
||||
--log.stdout.format <FORMAT>
|
||||
|
||||
Reference in New Issue
Block a user