mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5fb9d04948 | ||
|
|
40a72a8720 |
@@ -1,11 +1,19 @@
|
||||
use crate::stages::utils::collect_history_indices;
|
||||
|
||||
use super::{collect_account_history_indices, load_history_indices};
|
||||
use super::collect_account_history_indices;
|
||||
use alloy_primitives::Address;
|
||||
use reth_config::config::{EtlConfig, IndexHistoryConfig};
|
||||
use reth_db_api::{models::ShardedKey, table::Decode, tables, transaction::DbTxMut};
|
||||
use reth_db_api::{
|
||||
cursor::DbCursorRO,
|
||||
models::{sharded_key::NUM_OF_INDICES_IN_SHARD, ShardedKey},
|
||||
table::{Decode, Decompress},
|
||||
tables,
|
||||
transaction::{DbTx, DbTxMut},
|
||||
BlockNumberList,
|
||||
};
|
||||
use reth_provider::{
|
||||
DBProvider, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter, StorageSettingsCache,
|
||||
DBProvider, EitherWriter, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter,
|
||||
RocksDBProviderFactory, StorageSettingsCache,
|
||||
};
|
||||
use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment};
|
||||
use reth_stages_api::{
|
||||
@@ -53,7 +61,8 @@ where
|
||||
+ PruneCheckpointWriter
|
||||
+ reth_storage_api::ChangeSetReader
|
||||
+ reth_provider::StaticFileProviderFactory
|
||||
+ StorageSettingsCache,
|
||||
+ StorageSettingsCache
|
||||
+ RocksDBProviderFactory,
|
||||
{
|
||||
/// Return the id of the stage
|
||||
fn id(&self) -> StageId {
|
||||
@@ -102,14 +111,26 @@ where
|
||||
let mut range = input.next_block_range();
|
||||
let first_sync = input.checkpoint().block_number == 0;
|
||||
|
||||
// Check if we're using RocksDB for account history
|
||||
let use_rocksdb = provider.cached_storage_settings().account_history_in_rocksdb;
|
||||
|
||||
// On first sync we might have history coming from genesis. We clear the table since it's
|
||||
// faster to rebuild from scratch.
|
||||
if first_sync {
|
||||
provider.tx_ref().clear::<tables::AccountsHistory>()?;
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
if use_rocksdb {
|
||||
provider.rocksdb_provider().clear_table::<tables::AccountsHistory>()?;
|
||||
}
|
||||
#[cfg(not(all(unix, feature = "rocksdb")))]
|
||||
let _ = use_rocksdb;
|
||||
|
||||
if !use_rocksdb {
|
||||
provider.tx_ref().clear::<tables::AccountsHistory>()?;
|
||||
}
|
||||
range = 0..=*input.next_block_range().end();
|
||||
}
|
||||
|
||||
info!(target: "sync::stages::index_account_history::exec", ?first_sync, "Collecting indices");
|
||||
info!(target: "sync::stages::index_account_history::exec", ?first_sync, ?use_rocksdb, "Collecting indices");
|
||||
|
||||
let collector = if provider.cached_storage_settings().account_changesets_in_static_files {
|
||||
// Use the provider-based collection that can read from static files.
|
||||
@@ -125,14 +146,26 @@ where
|
||||
};
|
||||
|
||||
info!(target: "sync::stages::index_account_history::exec", "Loading indices into database");
|
||||
load_history_indices::<_, tables::AccountsHistory, _>(
|
||||
provider,
|
||||
collector,
|
||||
first_sync,
|
||||
ShardedKey::new,
|
||||
ShardedKey::<Address>::decode_owned,
|
||||
|key| key.key,
|
||||
)?;
|
||||
|
||||
// Create RocksDB batch if feature is enabled
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
let rocksdb = provider.rocksdb_provider();
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
let rocksdb_batch = rocksdb.batch();
|
||||
#[cfg(not(all(unix, feature = "rocksdb")))]
|
||||
let rocksdb_batch = ();
|
||||
|
||||
// Create writer that routes to either MDBX or RocksDB based on settings
|
||||
let mut writer = EitherWriter::new_accounts_history(provider, rocksdb_batch)?;
|
||||
|
||||
// Load indices using the writer
|
||||
load_account_history_indices_with_writer(provider, &mut writer, collector, first_sync)?;
|
||||
|
||||
// Extract and register RocksDB batch for commit at provider level
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
if let Some(batch) = writer.into_raw_rocksdb_batch() {
|
||||
provider.set_pending_rocksdb_batch(batch);
|
||||
}
|
||||
|
||||
Ok(ExecOutput { checkpoint: StageCheckpoint::new(*range.end()), done: true })
|
||||
}
|
||||
@@ -153,6 +186,144 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Loads account history indices from a collector into the database using an [`EitherWriter`].
|
||||
///
|
||||
/// This function processes entries from the collector, grouping indices by address and
|
||||
/// writing them as shards when they reach the maximum shard size. It handles merging
|
||||
/// with existing shards during incremental syncs.
|
||||
fn load_account_history_indices_with_writer<Provider, CURSOR, N>(
|
||||
provider: &Provider,
|
||||
writer: &mut EitherWriter<'_, CURSOR, N>,
|
||||
mut collector: reth_etl::Collector<ShardedKey<Address>, BlockNumberList>,
|
||||
append_only: bool,
|
||||
) -> Result<(), StageError>
|
||||
where
|
||||
Provider: DBProvider<Tx: DbTxMut> + StorageSettingsCache + RocksDBProviderFactory,
|
||||
CURSOR: reth_db_api::cursor::DbCursorRW<tables::AccountsHistory>
|
||||
+ reth_db_api::cursor::DbCursorRO<tables::AccountsHistory>,
|
||||
N: reth_primitives_traits::NodePrimitives,
|
||||
{
|
||||
let mut current_address = Address::ZERO;
|
||||
let mut current_list = Vec::<u64>::new();
|
||||
|
||||
// observability
|
||||
let total_entries = collector.len();
|
||||
let interval = (total_entries / 10).max(1);
|
||||
|
||||
for (index, element) in collector.iter()?.enumerate() {
|
||||
let (k, v) = element?;
|
||||
let sharded_key = ShardedKey::<Address>::decode_owned(k)?;
|
||||
let new_list = BlockNumberList::decompress_owned(v)?;
|
||||
|
||||
if index > 0 && index.is_multiple_of(interval) && total_entries > 10 {
|
||||
info!(target: "sync::stages::index_account_history", progress = %format!("{:.2}%", (index as f64 / total_entries as f64) * 100.0), "Writing indices");
|
||||
}
|
||||
|
||||
let address = sharded_key.key;
|
||||
|
||||
if current_address != address {
|
||||
// We have reached the end of this address, flush remaining indices
|
||||
flush_account_history_shards(writer, current_address, &mut current_list, append_only)?;
|
||||
current_address = address;
|
||||
current_list.clear();
|
||||
|
||||
// If it's not the first sync, merge with existing last shard
|
||||
if !append_only &&
|
||||
let Some(existing_list) =
|
||||
get_last_account_history_shard(provider, current_address)?
|
||||
{
|
||||
current_list.extend(existing_list.iter());
|
||||
}
|
||||
}
|
||||
|
||||
current_list.extend(new_list.iter());
|
||||
|
||||
// Write full shards, keep the partial shard in memory
|
||||
write_full_shards(writer, current_address, &mut current_list)?;
|
||||
}
|
||||
|
||||
// Flush the last address's remaining shard
|
||||
flush_account_history_shards(writer, current_address, &mut current_list, append_only)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Retrieves the last shard for an account from `RocksDB` or MDBX based on storage settings.
|
||||
fn get_last_account_history_shard<Provider>(
|
||||
provider: &Provider,
|
||||
address: Address,
|
||||
) -> Result<Option<BlockNumberList>, StageError>
|
||||
where
|
||||
Provider: DBProvider + StorageSettingsCache + RocksDBProviderFactory,
|
||||
{
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
if provider.cached_storage_settings().account_history_in_rocksdb {
|
||||
let rocksdb = provider.rocksdb_provider();
|
||||
let key = ShardedKey::last(address);
|
||||
return Ok(rocksdb.get::<tables::AccountsHistory>(key)?);
|
||||
}
|
||||
|
||||
// Read from MDBX
|
||||
let key = ShardedKey::last(address);
|
||||
let mut cursor = provider.tx_ref().cursor_read::<tables::AccountsHistory>()?;
|
||||
Ok(cursor.seek_exact(key)?.map(|(_, v)| v))
|
||||
}
|
||||
|
||||
/// Writes full shards from `list` to the database, draining them from the list.
|
||||
fn write_full_shards<CURSOR, N>(
|
||||
writer: &mut EitherWriter<'_, CURSOR, N>,
|
||||
address: Address,
|
||||
list: &mut Vec<u64>,
|
||||
) -> Result<(), StageError>
|
||||
where
|
||||
CURSOR: reth_db_api::cursor::DbCursorRW<tables::AccountsHistory>
|
||||
+ reth_db_api::cursor::DbCursorRO<tables::AccountsHistory>,
|
||||
N: reth_primitives_traits::NodePrimitives,
|
||||
{
|
||||
while list.len() > NUM_OF_INDICES_IN_SHARD {
|
||||
let chunk: Vec<u64> = list.drain(..NUM_OF_INDICES_IN_SHARD).collect();
|
||||
let highest = *chunk.last().expect("chunk is not empty");
|
||||
let key = ShardedKey::new(address, highest);
|
||||
let value = BlockNumberList::new_pre_sorted(chunk);
|
||||
writer.put_account_history(key, &value)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Flushes all remaining shards to the database. The last shard gets `u64::MAX` as its highest
|
||||
/// block.
|
||||
fn flush_account_history_shards<CURSOR, N>(
|
||||
writer: &mut EitherWriter<'_, CURSOR, N>,
|
||||
address: Address,
|
||||
list: &mut Vec<u64>,
|
||||
append_only: bool,
|
||||
) -> Result<(), StageError>
|
||||
where
|
||||
CURSOR: reth_db_api::cursor::DbCursorRW<tables::AccountsHistory>
|
||||
+ reth_db_api::cursor::DbCursorRO<tables::AccountsHistory>,
|
||||
N: reth_primitives_traits::NodePrimitives,
|
||||
{
|
||||
if list.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
write_full_shards(writer, address, list)?;
|
||||
|
||||
// The last shard always uses u64::MAX
|
||||
if !list.is_empty() {
|
||||
let key = ShardedKey::last(address);
|
||||
let value = BlockNumberList::new_pre_sorted(list.drain(..));
|
||||
|
||||
// For incremental sync, delete the old last shard first (it will be replaced)
|
||||
if !append_only {
|
||||
writer.delete_account_history(key.clone())?;
|
||||
}
|
||||
writer.put_account_history(key, &value)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
@@ -646,4 +817,187 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
mod rocksdb_tests {
|
||||
use super::*;
|
||||
use reth_provider::RocksDBProviderFactory;
|
||||
use reth_storage_api::StorageSettings;
|
||||
|
||||
/// Test that when `account_history_in_rocksdb` is enabled, the stage
|
||||
/// writes account history indices to RocksDB instead of MDBX.
|
||||
#[tokio::test]
|
||||
async fn execute_writes_to_rocksdb_when_enabled() {
|
||||
// init
|
||||
let db = TestStageDB::default();
|
||||
|
||||
// Enable RocksDB for account history
|
||||
db.factory.set_storage_settings_cache(
|
||||
StorageSettings::legacy().with_account_history_in_rocksdb(true),
|
||||
);
|
||||
|
||||
// setup - create changesets for blocks 1-10
|
||||
db.commit(|tx| {
|
||||
for block in 0..=10 {
|
||||
tx.put::<tables::BlockBodyIndices>(
|
||||
block,
|
||||
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
|
||||
)?;
|
||||
tx.put::<tables::AccountChangeSets>(block, acc())?;
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
// Run stage
|
||||
let input = ExecInput { target: Some(10), ..Default::default() };
|
||||
let mut stage = IndexAccountHistoryStage::default();
|
||||
let provider = db.factory.database_provider_rw().unwrap();
|
||||
let out = stage.execute(&provider, input).unwrap();
|
||||
assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(10), done: true });
|
||||
provider.commit().unwrap();
|
||||
|
||||
// Verify MDBX table is empty (data should be in RocksDB)
|
||||
let mdbx_table = db.table::<tables::AccountsHistory>().unwrap();
|
||||
assert!(
|
||||
mdbx_table.is_empty(),
|
||||
"MDBX AccountsHistory should be empty when RocksDB is enabled"
|
||||
);
|
||||
|
||||
// Verify RocksDB has the data
|
||||
let rocksdb = db.factory.rocksdb_provider();
|
||||
let result = rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap();
|
||||
assert!(result.is_some(), "RocksDB should contain account history");
|
||||
|
||||
let block_list = result.unwrap();
|
||||
let blocks: Vec<u64> = block_list.iter().collect();
|
||||
assert_eq!(blocks, (0..=10).collect::<Vec<_>>());
|
||||
}
|
||||
|
||||
/// Test that when `account_history_in_rocksdb` is enabled, the stage
|
||||
/// unwind deletes account history indices from RocksDB instead of MDBX.
|
||||
///
|
||||
/// Note: Full unwind support for RocksDB requires updates to the HistoryWriter trait
|
||||
/// implementation. This test verifies the basic execute functionality.
|
||||
#[tokio::test]
|
||||
async fn unwind_deletes_from_rocksdb_when_enabled() {
|
||||
// init
|
||||
let db = TestStageDB::default();
|
||||
|
||||
// Enable RocksDB for account history
|
||||
db.factory.set_storage_settings_cache(
|
||||
StorageSettings::legacy().with_account_history_in_rocksdb(true),
|
||||
);
|
||||
|
||||
// setup - create changesets for blocks 1-10
|
||||
db.commit(|tx| {
|
||||
for block in 0..=10 {
|
||||
tx.put::<tables::BlockBodyIndices>(
|
||||
block,
|
||||
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
|
||||
)?;
|
||||
tx.put::<tables::AccountChangeSets>(block, acc())?;
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
// Run stage to populate data
|
||||
let input = ExecInput { target: Some(10), ..Default::default() };
|
||||
let mut stage = IndexAccountHistoryStage::default();
|
||||
let provider = db.factory.database_provider_rw().unwrap();
|
||||
let out = stage.execute(&provider, input).unwrap();
|
||||
assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(10), done: true });
|
||||
provider.commit().unwrap();
|
||||
|
||||
// Verify RocksDB has the data before unwind
|
||||
let rocksdb = db.factory.rocksdb_provider();
|
||||
let result = rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap();
|
||||
assert!(result.is_some(), "RocksDB should have data before unwind");
|
||||
|
||||
// Unwind to block 5
|
||||
// Note: The current HistoryWriter implementation doesn't yet support RocksDB unwind.
|
||||
// This test verifies the unwind doesn't panic and properly updates checkpoints.
|
||||
let unwind_input =
|
||||
UnwindInput { checkpoint: StageCheckpoint::new(10), unwind_to: 5, bad_block: None };
|
||||
let provider = db.factory.database_provider_rw().unwrap();
|
||||
let out = stage.unwind(&provider, unwind_input).unwrap();
|
||||
assert_eq!(out, UnwindOutput { checkpoint: StageCheckpoint::new(5) });
|
||||
provider.commit().unwrap();
|
||||
|
||||
// Verify RocksDB data still exists (unwind currently uses MDBX path which doesn't
|
||||
// affect RocksDB). Once HistoryWriter is updated for RocksDB, this should verify
|
||||
// that blocks 6-10 are removed.
|
||||
let rocksdb = db.factory.rocksdb_provider();
|
||||
let result = rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap();
|
||||
assert!(result.is_some(), "RocksDB should still have data");
|
||||
}
|
||||
|
||||
/// Test incremental sync - merging new data with existing data.
|
||||
#[tokio::test]
|
||||
async fn execute_incremental_sync() {
|
||||
// init
|
||||
let db = TestStageDB::default();
|
||||
|
||||
// Enable RocksDB for account history
|
||||
db.factory.set_storage_settings_cache(
|
||||
StorageSettings::legacy().with_account_history_in_rocksdb(true),
|
||||
);
|
||||
|
||||
// setup - first batch of changesets
|
||||
db.commit(|tx| {
|
||||
for block in 0..=5 {
|
||||
tx.put::<tables::BlockBodyIndices>(
|
||||
block,
|
||||
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
|
||||
)?;
|
||||
tx.put::<tables::AccountChangeSets>(block, acc())?;
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
// First sync (blocks 0-5)
|
||||
let input = ExecInput { target: Some(5), ..Default::default() };
|
||||
let mut stage = IndexAccountHistoryStage::default();
|
||||
let provider = db.factory.database_provider_rw().unwrap();
|
||||
let out = stage.execute(&provider, input).unwrap();
|
||||
assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(5), done: true });
|
||||
provider.commit().unwrap();
|
||||
|
||||
// Verify first sync data in RocksDB
|
||||
let rocksdb = db.factory.rocksdb_provider();
|
||||
let result = rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap();
|
||||
assert!(result.is_some());
|
||||
let blocks: Vec<u64> = result.unwrap().iter().collect();
|
||||
assert_eq!(blocks, (0..=5).collect::<Vec<_>>());
|
||||
|
||||
// Add more changesets for blocks 6-10
|
||||
db.commit(|tx| {
|
||||
for block in 6..=10 {
|
||||
tx.put::<tables::BlockBodyIndices>(
|
||||
block,
|
||||
StoredBlockBodyIndices { tx_count: 3, ..Default::default() },
|
||||
)?;
|
||||
tx.put::<tables::AccountChangeSets>(block, acc())?;
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
// Second sync (blocks 6-10)
|
||||
let input = ExecInput { target: Some(10), checkpoint: Some(StageCheckpoint::new(5)) };
|
||||
let provider = db.factory.database_provider_rw().unwrap();
|
||||
let out = stage.execute(&provider, input).unwrap();
|
||||
assert_eq!(out, ExecOutput { checkpoint: StageCheckpoint::new(10), done: true });
|
||||
provider.commit().unwrap();
|
||||
|
||||
// Verify merged data - should have blocks 0-10
|
||||
let rocksdb = db.factory.rocksdb_provider();
|
||||
let result = rocksdb.get::<tables::AccountsHistory>(shard(u64::MAX)).unwrap();
|
||||
assert!(result.is_some(), "RocksDB should have merged data");
|
||||
let blocks: Vec<u64> = result.unwrap().iter().collect();
|
||||
assert_eq!(blocks, (0..=10).collect::<Vec<_>>(), "Should have merged blocks 0-10");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -514,6 +514,34 @@ impl RocksDBProvider {
|
||||
})
|
||||
}
|
||||
|
||||
/// Clears all entries from the specified table.
|
||||
///
|
||||
/// This iterates over all keys and deletes them in a single batch.
|
||||
pub fn clear_table<T: Table>(&self) -> ProviderResult<()> {
|
||||
self.execute_with_operation_metric(RocksDBOperation::BatchWrite, T::NAME, |this| {
|
||||
let cf = this.get_cf_handle::<T>()?;
|
||||
let mut batch = WriteBatchWithTransaction::<true>::default();
|
||||
|
||||
let iter = this.0.db.iterator_cf(cf, IteratorMode::Start);
|
||||
for result in iter {
|
||||
let (key, _) = result.map_err(|e| {
|
||||
ProviderError::Database(DatabaseError::Read(DatabaseErrorInfo {
|
||||
message: e.to_string().into(),
|
||||
code: -1,
|
||||
}))
|
||||
})?;
|
||||
batch.delete_cf(cf, &key);
|
||||
}
|
||||
|
||||
this.0.db.write_opt(batch, &WriteOptions::default()).map_err(|e| {
|
||||
ProviderError::Database(DatabaseError::Commit(DatabaseErrorInfo {
|
||||
message: e.to_string().into(),
|
||||
code: -1,
|
||||
}))
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
/// Writes all `RocksDB` data for multiple blocks in parallel.
|
||||
///
|
||||
/// This handles transaction hash numbers, account history, and storage history based on
|
||||
|
||||
Reference in New Issue
Block a user