mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-02-19 03:04:27 -05:00
feat(storage): split static file commit into sync_all and finalize (#20984)
This commit is contained in:
@@ -20,6 +20,9 @@ pub enum ProviderError {
|
||||
/// Pruning error.
|
||||
#[error(transparent)]
|
||||
Pruning(#[from] PruneSegmentError),
|
||||
/// Static file writer error.
|
||||
#[error(transparent)]
|
||||
StaticFileWriter(#[from] StaticFileWriterError),
|
||||
/// RLP error.
|
||||
#[error("{_0}")]
|
||||
Rlp(alloy_rlp::Error),
|
||||
@@ -216,18 +219,21 @@ pub struct RootMismatch {
|
||||
pub block_hash: BlockHash,
|
||||
}
|
||||
|
||||
/// A Static File Write Error.
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
#[error("{message}")]
|
||||
pub struct StaticFileWriterError {
|
||||
/// The error message.
|
||||
pub message: String,
|
||||
/// A Static File Writer Error.
|
||||
#[derive(Clone, Debug, PartialEq, Eq, thiserror::Error)]
|
||||
pub enum StaticFileWriterError {
|
||||
/// Cannot call `sync_all` or `finalize` when prune is queued.
|
||||
#[error("cannot call sync_all or finalize when prune is queued, use commit() instead")]
|
||||
FinalizeWithPruneQueued,
|
||||
/// Other error with message.
|
||||
#[error("{0}")]
|
||||
Other(String),
|
||||
}
|
||||
|
||||
impl StaticFileWriterError {
|
||||
/// Creates a new [`StaticFileWriterError`] with the given message.
|
||||
/// Creates a new [`StaticFileWriterError::Other`] with the given message.
|
||||
pub fn new(message: impl Into<String>) -> Self {
|
||||
Self { message: message.into() }
|
||||
Self::Other(message.into())
|
||||
}
|
||||
}
|
||||
/// Consistent database view error.
|
||||
|
||||
@@ -347,11 +347,27 @@ impl<H: NippyJarHeader> NippyJarWriter<H> {
|
||||
|
||||
/// Commits configuration and offsets to disk. It drains the internal offset list.
|
||||
pub fn commit(&mut self) -> Result<(), NippyJarError> {
|
||||
self.sync_all()?;
|
||||
self.finalize()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Syncs data and offsets to disk.
|
||||
///
|
||||
/// This does NOT commit the configuration. Call [`Self::finalize`] after to write the
|
||||
/// configuration and mark the writer as clean.
|
||||
pub fn sync_all(&mut self) -> Result<(), NippyJarError> {
|
||||
self.data_file.flush()?;
|
||||
self.data_file.get_ref().sync_all()?;
|
||||
|
||||
self.commit_offsets()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Commits configuration to disk and marks the writer as clean.
|
||||
///
|
||||
/// Must be called after [`Self::sync_all`] to complete the commit.
|
||||
pub fn finalize(&mut self) -> Result<(), NippyJarError> {
|
||||
// Flushes `max_row_size` and total `rows` to disk.
|
||||
self.jar.freeze_config()?;
|
||||
self.dirty = false;
|
||||
|
||||
@@ -1830,6 +1830,11 @@ pub trait StaticFileWriter {
|
||||
|
||||
/// Returns `true` if the static file provider has unwind queued.
|
||||
fn has_unwind_queued(&self) -> bool;
|
||||
|
||||
/// Finalizes all static file writers by committing their configuration to disk.
|
||||
///
|
||||
/// Returns an error if prune is queued (use [`Self::commit`] instead).
|
||||
fn finalize(&self) -> ProviderResult<()>;
|
||||
}
|
||||
|
||||
impl<N: NodePrimitives> StaticFileWriter for StaticFileProvider<N> {
|
||||
@@ -1868,6 +1873,10 @@ impl<N: NodePrimitives> StaticFileWriter for StaticFileProvider<N> {
|
||||
fn has_unwind_queued(&self) -> bool {
|
||||
self.writers.has_unwind_queued()
|
||||
}
|
||||
|
||||
fn finalize(&self) -> ProviderResult<()> {
|
||||
self.writers.finalize()
|
||||
}
|
||||
}
|
||||
|
||||
impl<N: NodePrimitives> ChangeSetReader for StaticFileProvider<N> {
|
||||
|
||||
@@ -141,6 +141,30 @@ impl<N: NodePrimitives> StaticFileWriters<N> {
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
/// Finalizes all writers by committing their configuration to disk and updating indices.
|
||||
///
|
||||
/// Must be called after `sync_all` was called on individual writers.
|
||||
/// Returns an error if any writer has prune queued.
|
||||
pub(crate) fn finalize(&self) -> ProviderResult<()> {
|
||||
debug!(target: "provider::static_file", "Finalizing all static file segments into disk");
|
||||
|
||||
for writer_lock in [
|
||||
&self.headers,
|
||||
&self.transactions,
|
||||
&self.receipts,
|
||||
&self.transaction_senders,
|
||||
&self.account_change_sets,
|
||||
] {
|
||||
let mut writer = writer_lock.write();
|
||||
if let Some(writer) = writer.as_mut() {
|
||||
writer.finalize()?;
|
||||
}
|
||||
}
|
||||
|
||||
debug!(target: "provider::static_file", "Finalized all static file segments into disk");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Mutable reference to a [`StaticFileProviderRW`] behind a [`RwLockWriteGuard`].
|
||||
@@ -298,6 +322,38 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
|
||||
self.prune_on_commit.is_some()
|
||||
}
|
||||
|
||||
/// Syncs all data (rows and offsets) to disk.
|
||||
///
|
||||
/// This does NOT commit the configuration. Call [`Self::finalize`] after to write the
|
||||
/// configuration and mark the writer as clean.
|
||||
///
|
||||
/// Returns an error if prune is queued (use [`Self::commit`] instead).
|
||||
pub fn sync_all(&mut self) -> ProviderResult<()> {
|
||||
if self.prune_on_commit.is_some() {
|
||||
return Err(StaticFileWriterError::FinalizeWithPruneQueued.into());
|
||||
}
|
||||
if self.writer.is_dirty() {
|
||||
self.writer.sync_all().map_err(ProviderError::other)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Commits configuration to disk and updates the reader index.
|
||||
///
|
||||
/// Must be called after [`Self::sync_all`] to complete the commit.
|
||||
///
|
||||
/// Returns an error if prune is queued (use [`Self::commit`] instead).
|
||||
pub fn finalize(&mut self) -> ProviderResult<()> {
|
||||
if self.prune_on_commit.is_some() {
|
||||
return Err(StaticFileWriterError::FinalizeWithPruneQueued.into());
|
||||
}
|
||||
if self.writer.is_dirty() {
|
||||
self.writer.finalize().map_err(ProviderError::other)?;
|
||||
self.update_index()?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Commits configuration changes to disk and updates the reader index with the new changes.
|
||||
pub fn commit(&mut self) -> ProviderResult<()> {
|
||||
let start = Instant::now();
|
||||
|
||||
Reference in New Issue
Block a user