diff --git a/crates/stages/stages/src/stages/tx_lookup.rs b/crates/stages/stages/src/stages/tx_lookup.rs index 7cf77269d5..1148eb443a 100644 --- a/crates/stages/stages/src/stages/tx_lookup.rs +++ b/crates/stages/stages/src/stages/tx_lookup.rs @@ -161,37 +161,34 @@ where provider.count_entries::()?.is_zero(); // Auto-commits on threshold; consistency check heals any crash. - #[cfg(all(unix, feature = "rocksdb"))] - let rocksdb = provider.rocksdb_provider(); - #[cfg(all(unix, feature = "rocksdb"))] - let rocksdb_batch = rocksdb.batch_with_auto_commit(); - #[cfg(not(all(unix, feature = "rocksdb")))] - let rocksdb_batch = (); - let mut writer = - EitherWriter::new_transaction_hash_numbers(provider, rocksdb_batch)?; + provider.with_rocksdb_batch_auto_commit(|rocksdb_batch| { + let mut writer = + EitherWriter::new_transaction_hash_numbers(provider, rocksdb_batch)?; - for (index, hash_to_number) in hash_collector.iter()?.enumerate() { - let (hash_bytes, number_bytes) = hash_to_number?; - if index > 0 && index.is_multiple_of(interval) { - info!( - target: "sync::stages::transaction_lookup", - ?append_only, - progress = %format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0), - "Inserting hashes" - ); + let iter = hash_collector + .iter() + .map_err(|e| ProviderError::other(Box::new(e)))?; + + for (index, hash_to_number) in iter.enumerate() { + let (hash_bytes, number_bytes) = + hash_to_number.map_err(|e| ProviderError::other(Box::new(e)))?; + if index > 0 && index.is_multiple_of(interval) { + info!( + target: "sync::stages::transaction_lookup", + ?append_only, + progress = %format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0), + "Inserting hashes" + ); + } + + // Decode from raw ETL bytes + let hash = TxHash::decode(&hash_bytes)?; + let tx_num = TxNumber::decompress(&number_bytes)?; + writer.put_transaction_hash_number(hash, tx_num, append_only)?; } - // Decode from raw ETL bytes - let hash = TxHash::decode(&hash_bytes)?; - let tx_num = TxNumber::decompress(&number_bytes)?; - writer.put_transaction_hash_number(hash, tx_num, append_only)?; - } - - // Extract and register RocksDB batch for commit at provider level - #[cfg(all(unix, feature = "rocksdb"))] - if let Some(batch) = writer.into_raw_rocksdb_batch() { - provider.set_pending_rocksdb_batch(batch); - } + Ok(((), writer.into_raw_rocksdb_batch())) + })?; trace!(target: "sync::stages::transaction_lookup", total_hashes, @@ -223,36 +220,31 @@ where ) -> Result { let (range, unwind_to, _) = input.unwind_block_range_with_threshold(self.chunk_size); - #[cfg(all(unix, feature = "rocksdb"))] - let rocksdb = provider.rocksdb_provider(); - #[cfg(all(unix, feature = "rocksdb"))] - let rocksdb_batch = rocksdb.batch(); - #[cfg(not(all(unix, feature = "rocksdb")))] - let rocksdb_batch = (); - let mut writer = EitherWriter::new_transaction_hash_numbers(provider, rocksdb_batch)?; + provider.with_rocksdb_batch(|rocksdb_batch| { + let mut writer = EitherWriter::new_transaction_hash_numbers(provider, rocksdb_batch)?; - let static_file_provider = provider.static_file_provider(); - let rev_walker = - provider.block_body_indices_range(range.clone())?.into_iter().rev().zip(range.rev()); + let static_file_provider = provider.static_file_provider(); + let rev_walker = provider + .block_body_indices_range(range.clone())? + .into_iter() + .rev() + .zip(range.rev()); - for (body, number) in rev_walker { - if number <= unwind_to { - break; - } + for (body, number) in rev_walker { + if number <= unwind_to { + break; + } - // Delete all transactions that belong to this block - for tx_id in body.tx_num_range() { - if let Some(transaction) = static_file_provider.transaction_by_id(tx_id)? { - writer.delete_transaction_hash_number(transaction.trie_hash())?; + // Delete all transactions that belong to this block + for tx_id in body.tx_num_range() { + if let Some(transaction) = static_file_provider.transaction_by_id(tx_id)? { + writer.delete_transaction_hash_number(transaction.trie_hash())?; + } } } - } - // Extract and register RocksDB batch for commit at provider level - #[cfg(all(unix, feature = "rocksdb"))] - if let Some(batch) = writer.into_raw_rocksdb_batch() { - provider.set_pending_rocksdb_batch(batch); - } + Ok(((), writer.into_raw_rocksdb_batch())) + })?; Ok(UnwindOutput { checkpoint: StageCheckpoint::new(unwind_to)