refactor(stages): use with_rocksdb_batch_auto_commit in tx_lookup (#21722)

This commit is contained in:
YK
2026-02-03 22:35:07 +08:00
committed by GitHub
parent 6b7cc00289
commit 7f970e136a

View File

@@ -161,37 +161,34 @@ where
provider.count_entries::<tables::TransactionHashNumbers>()?.is_zero();
// Auto-commits on threshold; consistency check heals any crash.
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb = provider.rocksdb_provider();
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb_batch = rocksdb.batch_with_auto_commit();
#[cfg(not(all(unix, feature = "rocksdb")))]
let rocksdb_batch = ();
let mut writer =
EitherWriter::new_transaction_hash_numbers(provider, rocksdb_batch)?;
provider.with_rocksdb_batch_auto_commit(|rocksdb_batch| {
let mut writer =
EitherWriter::new_transaction_hash_numbers(provider, rocksdb_batch)?;
for (index, hash_to_number) in hash_collector.iter()?.enumerate() {
let (hash_bytes, number_bytes) = hash_to_number?;
if index > 0 && index.is_multiple_of(interval) {
info!(
target: "sync::stages::transaction_lookup",
?append_only,
progress = %format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0),
"Inserting hashes"
);
let iter = hash_collector
.iter()
.map_err(|e| ProviderError::other(Box::new(e)))?;
for (index, hash_to_number) in iter.enumerate() {
let (hash_bytes, number_bytes) =
hash_to_number.map_err(|e| ProviderError::other(Box::new(e)))?;
if index > 0 && index.is_multiple_of(interval) {
info!(
target: "sync::stages::transaction_lookup",
?append_only,
progress = %format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0),
"Inserting hashes"
);
}
// Decode from raw ETL bytes
let hash = TxHash::decode(&hash_bytes)?;
let tx_num = TxNumber::decompress(&number_bytes)?;
writer.put_transaction_hash_number(hash, tx_num, append_only)?;
}
// Decode from raw ETL bytes
let hash = TxHash::decode(&hash_bytes)?;
let tx_num = TxNumber::decompress(&number_bytes)?;
writer.put_transaction_hash_number(hash, tx_num, append_only)?;
}
// Extract and register RocksDB batch for commit at provider level
#[cfg(all(unix, feature = "rocksdb"))]
if let Some(batch) = writer.into_raw_rocksdb_batch() {
provider.set_pending_rocksdb_batch(batch);
}
Ok(((), writer.into_raw_rocksdb_batch()))
})?;
trace!(target: "sync::stages::transaction_lookup",
total_hashes,
@@ -223,36 +220,31 @@ where
) -> Result<UnwindOutput, StageError> {
let (range, unwind_to, _) = input.unwind_block_range_with_threshold(self.chunk_size);
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb = provider.rocksdb_provider();
#[cfg(all(unix, feature = "rocksdb"))]
let rocksdb_batch = rocksdb.batch();
#[cfg(not(all(unix, feature = "rocksdb")))]
let rocksdb_batch = ();
let mut writer = EitherWriter::new_transaction_hash_numbers(provider, rocksdb_batch)?;
provider.with_rocksdb_batch(|rocksdb_batch| {
let mut writer = EitherWriter::new_transaction_hash_numbers(provider, rocksdb_batch)?;
let static_file_provider = provider.static_file_provider();
let rev_walker =
provider.block_body_indices_range(range.clone())?.into_iter().rev().zip(range.rev());
let static_file_provider = provider.static_file_provider();
let rev_walker = provider
.block_body_indices_range(range.clone())?
.into_iter()
.rev()
.zip(range.rev());
for (body, number) in rev_walker {
if number <= unwind_to {
break;
}
for (body, number) in rev_walker {
if number <= unwind_to {
break;
}
// Delete all transactions that belong to this block
for tx_id in body.tx_num_range() {
if let Some(transaction) = static_file_provider.transaction_by_id(tx_id)? {
writer.delete_transaction_hash_number(transaction.trie_hash())?;
// Delete all transactions that belong to this block
for tx_id in body.tx_num_range() {
if let Some(transaction) = static_file_provider.transaction_by_id(tx_id)? {
writer.delete_transaction_hash_number(transaction.trie_hash())?;
}
}
}
}
// Extract and register RocksDB batch for commit at provider level
#[cfg(all(unix, feature = "rocksdb"))]
if let Some(batch) = writer.into_raw_rocksdb_batch() {
provider.set_pending_rocksdb_batch(batch);
}
Ok(((), writer.into_raw_rocksdb_batch()))
})?;
Ok(UnwindOutput {
checkpoint: StageCheckpoint::new(unwind_to)