mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-02-19 03:04:27 -05:00
fix: ensure edge enables history in rocksdb (#21478)
This commit is contained in:
@@ -125,7 +125,10 @@ pub async fn setup_engine_with_chain_import(
|
||||
db.clone(),
|
||||
chain_spec.clone(),
|
||||
reth_provider::providers::StaticFileProvider::read_write(static_files_path.clone())?,
|
||||
reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path).build().unwrap(),
|
||||
reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path)
|
||||
.with_default_tables()
|
||||
.build()
|
||||
.unwrap(),
|
||||
)?;
|
||||
|
||||
// Initialize genesis if needed
|
||||
@@ -328,6 +331,7 @@ mod tests {
|
||||
reth_provider::providers::StaticFileProvider::read_write(static_files_path.clone())
|
||||
.unwrap(),
|
||||
reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path.clone())
|
||||
.with_default_tables()
|
||||
.build()
|
||||
.unwrap(),
|
||||
)
|
||||
@@ -392,6 +396,7 @@ mod tests {
|
||||
reth_provider::providers::StaticFileProvider::read_only(static_files_path, false)
|
||||
.unwrap(),
|
||||
reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path)
|
||||
.with_default_tables()
|
||||
.build()
|
||||
.unwrap(),
|
||||
)
|
||||
@@ -490,7 +495,10 @@ mod tests {
|
||||
db.clone(),
|
||||
chain_spec.clone(),
|
||||
reth_provider::providers::StaticFileProvider::read_write(static_files_path).unwrap(),
|
||||
reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path).build().unwrap(),
|
||||
reth_provider::providers::RocksDBProvider::builder(rocksdb_dir_path)
|
||||
.with_default_tables()
|
||||
.build()
|
||||
.unwrap(),
|
||||
)
|
||||
.expect("failed to create provider factory");
|
||||
|
||||
|
||||
@@ -251,7 +251,7 @@ pub async fn test_exex_context_with_chain_spec(
|
||||
db,
|
||||
chain_spec.clone(),
|
||||
StaticFileProvider::read_write(static_dir.keep()).expect("static file provider"),
|
||||
RocksDBProvider::builder(rocksdb_dir.keep()).build().unwrap(),
|
||||
RocksDBProvider::builder(rocksdb_dir.keep()).with_default_tables().build().unwrap(),
|
||||
)?;
|
||||
|
||||
let genesis_hash = init_genesis(&provider_factory)?;
|
||||
|
||||
@@ -63,9 +63,9 @@ impl StorageSettings {
|
||||
transaction_senders_in_static_files: true,
|
||||
account_changesets_in_static_files: true,
|
||||
storage_changesets_in_static_files: true,
|
||||
storages_history_in_rocksdb: false,
|
||||
storages_history_in_rocksdb: true,
|
||||
transaction_hash_numbers_in_rocksdb: true,
|
||||
account_history_in_rocksdb: false,
|
||||
account_history_in_rocksdb: true,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -932,27 +932,59 @@ mod tests {
|
||||
let factory = create_test_provider_factory_with_chain_spec(chain_spec);
|
||||
init_genesis(&factory).unwrap();
|
||||
|
||||
let provider = factory.provider().unwrap();
|
||||
let expected_accounts = vec![
|
||||
(ShardedKey::new(address_with_balance, u64::MAX), IntegerList::new([0]).unwrap()),
|
||||
(ShardedKey::new(address_with_storage, u64::MAX), IntegerList::new([0]).unwrap()),
|
||||
];
|
||||
let expected_storages = vec![(
|
||||
StorageShardedKey::new(address_with_storage, storage_key, u64::MAX),
|
||||
IntegerList::new([0]).unwrap(),
|
||||
)];
|
||||
|
||||
let tx = provider.tx_ref();
|
||||
let collect_from_mdbx = |factory: &ProviderFactory<MockNodeTypesWithDB>| {
|
||||
let provider = factory.provider().unwrap();
|
||||
let tx = provider.tx_ref();
|
||||
(
|
||||
collect_table_entries::<Arc<DatabaseEnv>, tables::AccountsHistory>(tx).unwrap(),
|
||||
collect_table_entries::<Arc<DatabaseEnv>, tables::StoragesHistory>(tx).unwrap(),
|
||||
)
|
||||
};
|
||||
|
||||
assert_eq!(
|
||||
collect_table_entries::<Arc<DatabaseEnv>, tables::AccountsHistory>(tx)
|
||||
.expect("failed to collect"),
|
||||
vec![
|
||||
(ShardedKey::new(address_with_balance, u64::MAX), IntegerList::new([0]).unwrap()),
|
||||
(ShardedKey::new(address_with_storage, u64::MAX), IntegerList::new([0]).unwrap())
|
||||
],
|
||||
);
|
||||
#[cfg(feature = "edge")]
|
||||
{
|
||||
let settings = factory.cached_storage_settings();
|
||||
let rocksdb = factory.rocksdb_provider();
|
||||
|
||||
assert_eq!(
|
||||
collect_table_entries::<Arc<DatabaseEnv>, tables::StoragesHistory>(tx)
|
||||
.expect("failed to collect"),
|
||||
vec![(
|
||||
StorageShardedKey::new(address_with_storage, storage_key, u64::MAX),
|
||||
IntegerList::new([0]).unwrap()
|
||||
)],
|
||||
);
|
||||
let collect_rocksdb = |rocksdb: &reth_provider::providers::RocksDBProvider| {
|
||||
(
|
||||
rocksdb
|
||||
.iter::<tables::AccountsHistory>()
|
||||
.unwrap()
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
.unwrap(),
|
||||
rocksdb
|
||||
.iter::<tables::StoragesHistory>()
|
||||
.unwrap()
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
.unwrap(),
|
||||
)
|
||||
};
|
||||
|
||||
let (accounts, storages) = if settings.account_history_in_rocksdb {
|
||||
collect_rocksdb(&rocksdb)
|
||||
} else {
|
||||
collect_from_mdbx(&factory)
|
||||
};
|
||||
assert_eq!(accounts, expected_accounts);
|
||||
assert_eq!(storages, expected_storages);
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "edge"))]
|
||||
{
|
||||
let (accounts, storages) = collect_from_mdbx(&factory);
|
||||
assert_eq!(accounts, expected_accounts);
|
||||
assert_eq!(storages, expected_storages);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -3319,6 +3319,13 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> BlockWriter
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Appends blocks with their execution state to the database.
|
||||
///
|
||||
/// **Note:** This function is only used in tests.
|
||||
///
|
||||
/// History indices are written to the appropriate backend based on storage settings:
|
||||
/// MDBX when `*_history_in_rocksdb` is false, `RocksDB` when true.
|
||||
///
|
||||
/// TODO(joshie): this fn should be moved to `UnifiedStorageWriter` eventually
|
||||
fn append_blocks_with_state(
|
||||
&self,
|
||||
@@ -3376,8 +3383,31 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> BlockWriter
|
||||
|
||||
// Use pre-computed transitions for history indices since static file
|
||||
// writes aren't visible until commit.
|
||||
self.insert_account_history_index(account_transitions)?;
|
||||
self.insert_storage_history_index(storage_transitions)?;
|
||||
// Note: For MDBX we use insert_*_history_index. For RocksDB we use
|
||||
// append_*_history_shard which handles read-merge-write internally.
|
||||
let storage_settings = self.cached_storage_settings();
|
||||
if storage_settings.account_history_in_rocksdb {
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
self.with_rocksdb_batch(|mut batch| {
|
||||
for (address, blocks) in account_transitions {
|
||||
batch.append_account_history_shard(address, blocks)?;
|
||||
}
|
||||
Ok(((), Some(batch.into_inner())))
|
||||
})?;
|
||||
} else {
|
||||
self.insert_account_history_index(account_transitions)?;
|
||||
}
|
||||
if storage_settings.storages_history_in_rocksdb {
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
self.with_rocksdb_batch(|mut batch| {
|
||||
for ((address, key), blocks) in storage_transitions {
|
||||
batch.append_storage_history_shard(address, key, blocks)?;
|
||||
}
|
||||
Ok(((), Some(batch.into_inner())))
|
||||
})?;
|
||||
} else {
|
||||
self.insert_storage_history_index(storage_transitions)?;
|
||||
}
|
||||
durations_recorder.record_relative(metrics::Action::InsertHistoryIndices);
|
||||
|
||||
// Update pipeline progress
|
||||
|
||||
Reference in New Issue
Block a user