mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
5 Commits
bal-devnet
...
fix/mdbx-c
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b9dbe5adea | ||
|
|
596a95fc04 | ||
|
|
5159f40452 | ||
|
|
7f0a6a67b1 | ||
|
|
d2a43a9288 |
@@ -92,10 +92,10 @@ impl DbTx for TxMock {
|
||||
|
||||
/// Commits the transaction.
|
||||
///
|
||||
/// **Mock behavior**: Always returns `Ok(true)`, indicating successful commit.
|
||||
/// **Mock behavior**: Always returns `Ok(())`, indicating successful commit.
|
||||
/// No actual data is persisted since this is a mock implementation.
|
||||
fn commit(self) -> Result<bool, DatabaseError> {
|
||||
Ok(true)
|
||||
fn commit(self) -> Result<(), DatabaseError> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Aborts the transaction.
|
||||
|
||||
@@ -35,7 +35,7 @@ pub trait DbTx: Debug + Send {
|
||||
) -> Result<Option<T::Value>, DatabaseError>;
|
||||
/// Commit for read only transaction will consume and free transaction and allows
|
||||
/// freeing of memory pages
|
||||
fn commit(self) -> Result<bool, DatabaseError>;
|
||||
fn commit(self) -> Result<(), DatabaseError>;
|
||||
/// Aborts transaction
|
||||
fn abort(self);
|
||||
/// Iterate over read only values in table.
|
||||
|
||||
@@ -295,10 +295,18 @@ impl<K: TransactionKind> DbTx for Tx<K> {
|
||||
})
|
||||
}
|
||||
|
||||
fn commit(self) -> Result<bool, DatabaseError> {
|
||||
fn commit(self) -> Result<(), DatabaseError> {
|
||||
self.execute_with_close_transaction_metric(TransactionOutcome::Commit, |this| {
|
||||
match this.inner.commit().map_err(|e| DatabaseError::Commit(e.into())) {
|
||||
Ok((v, latency)) => (Ok(v), Some(latency)),
|
||||
Ok((true, _)) => (
|
||||
Err(DatabaseError::Commit(
|
||||
"transaction was aborted due to a previous error (MDBX_RESULT_TRUE)"
|
||||
.to_string()
|
||||
.into(),
|
||||
)),
|
||||
None,
|
||||
),
|
||||
Ok((false, latency)) => (Ok(()), Some(latency)),
|
||||
Err(e) => (Err(e), None),
|
||||
}
|
||||
})
|
||||
|
||||
@@ -125,7 +125,7 @@ impl<DB: Database, N: NodeTypes> AsRef<DatabaseProvider<<DB as Database>::TXMut,
|
||||
|
||||
impl<DB: Database, N: NodeTypes + 'static> DatabaseProviderRW<DB, N> {
|
||||
/// Commit database transaction and static file if it exists.
|
||||
pub fn commit(self) -> ProviderResult<bool> {
|
||||
pub fn commit(self) -> ProviderResult<()> {
|
||||
self.0.commit()
|
||||
}
|
||||
|
||||
@@ -3422,7 +3422,7 @@ impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider
|
||||
}
|
||||
|
||||
/// Commit database transaction, static files, and pending `RocksDB` batches.
|
||||
fn commit(self) -> ProviderResult<bool> {
|
||||
fn commit(self) -> ProviderResult<()> {
|
||||
// For unwinding it makes more sense to commit the database first, since if
|
||||
// it is interrupted before the static files commit, we can just
|
||||
// truncate the static files according to the
|
||||
@@ -3453,7 +3453,7 @@ impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider
|
||||
self.tx.commit()?;
|
||||
}
|
||||
|
||||
Ok(true)
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1361,38 +1361,21 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
Provider: DBProvider + BlockReader + StageCheckpointReader,
|
||||
{
|
||||
debug!(target: "reth::providers::static_file", ?segment, ?highest_static_file_entry, ?highest_static_file_block, "Ensuring invariants");
|
||||
|
||||
// If the static file is truly empty (never had data), this is a valid scenario when
|
||||
// enabling static files for the first time on an existing node. No invariants to check.
|
||||
if highest_static_file_entry.is_none() && highest_static_file_block.is_none() {
|
||||
debug!(target: "reth::providers::static_file", ?segment, "Static file is empty, allowing fresh start");
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let mut db_cursor = provider.tx_ref().cursor_read::<T>()?;
|
||||
|
||||
if let Some((db_first_entry, _)) = db_cursor.first()? {
|
||||
debug!(target: "reth::providers::static_file", ?segment, db_first_entry, "Found first database entry");
|
||||
if let (Some(highest_entry), Some(highest_block)) =
|
||||
(highest_static_file_entry, highest_static_file_block)
|
||||
{
|
||||
// If there is a gap between the entry found in static file and
|
||||
// database, then we have most likely lost static file data and need to unwind so we
|
||||
// can load it again
|
||||
if !(db_first_entry <= highest_entry || highest_entry + 1 == db_first_entry) {
|
||||
info!(
|
||||
target: "reth::providers::static_file",
|
||||
?db_first_entry,
|
||||
?highest_entry,
|
||||
unwind_target = highest_block,
|
||||
?segment,
|
||||
"Setting unwind target."
|
||||
);
|
||||
return Ok(Some(highest_block))
|
||||
}
|
||||
}
|
||||
|
||||
if let Some((db_last_entry, _)) = db_cursor.last()? &&
|
||||
highest_static_file_entry
|
||||
.is_none_or(|highest_entry| db_last_entry > highest_entry)
|
||||
{
|
||||
debug!(target: "reth::providers::static_file", ?segment, db_last_entry, ?highest_static_file_entry, "Database has entries beyond static files, no unwind needed");
|
||||
return Ok(None)
|
||||
}
|
||||
} else {
|
||||
debug!(target: "reth::providers::static_file", ?segment, "No database entries found");
|
||||
if let Some((db_last_entry, _)) = db_cursor.last()? &&
|
||||
highest_static_file_entry.is_none_or(|highest_entry| db_last_entry > highest_entry)
|
||||
{
|
||||
debug!(target: "reth::providers::static_file", ?segment, db_last_entry, ?highest_static_file_entry, "Database has entries beyond static files, no unwind needed");
|
||||
return Ok(None)
|
||||
}
|
||||
|
||||
let highest_static_file_entry = highest_static_file_entry.unwrap_or_default();
|
||||
|
||||
@@ -428,8 +428,9 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
|
||||
let current_block = if let Some(current_block_number) = self.current_block_number() {
|
||||
current_block_number
|
||||
} else {
|
||||
self.increment_block(0)?;
|
||||
0
|
||||
let start_block = self.writer.user_header().expected_block_start();
|
||||
self.increment_block(start_block)?;
|
||||
start_block
|
||||
};
|
||||
|
||||
match current_block.cmp(&advance_to) {
|
||||
@@ -456,6 +457,20 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
|
||||
pub fn increment_block(&mut self, expected_block_number: BlockNumber) -> ProviderResult<()> {
|
||||
let segment = self.writer.user_header().segment();
|
||||
|
||||
// If the static file is empty and the expected block is beyond expected_block_start,
|
||||
// we need to advance to that block first. This can happen when enabling static files
|
||||
// on an existing node where the first block to write is not at the segment boundary.
|
||||
if self.writer.user_header().block_end().is_none() {
|
||||
let start_block = self.writer.user_header().expected_block_start();
|
||||
if expected_block_number > start_block {
|
||||
// Initialize the block range and advance to expected_block_number - 1
|
||||
self.writer.user_header_mut().increment_block();
|
||||
for block in start_block + 1..expected_block_number {
|
||||
self.writer.user_header_mut().increment_block();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.check_next_block_number(expected_block_number)?;
|
||||
|
||||
let start = Instant::now();
|
||||
|
||||
@@ -267,7 +267,7 @@ impl<T: NodePrimitives, ChainSpec: EthChainSpec + 'static> DBProvider
|
||||
self.tx
|
||||
}
|
||||
|
||||
fn commit(self) -> ProviderResult<bool> {
|
||||
fn commit(self) -> ProviderResult<()> {
|
||||
Ok(self.tx.commit()?)
|
||||
}
|
||||
|
||||
|
||||
@@ -1357,7 +1357,7 @@ where
|
||||
self
|
||||
}
|
||||
|
||||
fn commit(self) -> ProviderResult<bool> {
|
||||
fn commit(self) -> ProviderResult<()> {
|
||||
unimplemented!("commit not supported for RPC provider")
|
||||
}
|
||||
|
||||
|
||||
@@ -37,7 +37,7 @@ pub trait DBProvider: Sized {
|
||||
}
|
||||
|
||||
/// Commit database transaction
|
||||
fn commit(self) -> ProviderResult<bool>;
|
||||
fn commit(self) -> ProviderResult<()>;
|
||||
|
||||
/// Returns a reference to prune modes.
|
||||
fn prune_modes_ref(&self) -> &PruneModes;
|
||||
|
||||
@@ -639,7 +639,7 @@ impl<ChainSpec: Send + Sync, N: NodePrimitives> DBProvider for NoopProvider<Chai
|
||||
&self.prune_modes
|
||||
}
|
||||
|
||||
fn commit(self) -> ProviderResult<bool> {
|
||||
fn commit(self) -> ProviderResult<()> {
|
||||
use reth_db_api::transaction::DbTx;
|
||||
|
||||
Ok(self.tx.commit()?)
|
||||
|
||||
@@ -159,7 +159,7 @@ pub trait DbTx: Debug + Send + Sync {
|
||||
) -> Result<Option<T::Value>, DatabaseError>;
|
||||
/// Commit for read only transaction will consume and free transaction and allows
|
||||
/// freeing of memory pages
|
||||
fn commit(self) -> Result<bool, DatabaseError>;
|
||||
fn commit(self) -> Result<(), DatabaseError>;
|
||||
/// Aborts transaction
|
||||
fn abort(self);
|
||||
/// Iterate over read only values in table.
|
||||
|
||||
Reference in New Issue
Block a user