mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
chore(storage): propagate span context across rayon thread boundaries (#22497)
Co-authored-by: Claude Haiku 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -575,9 +575,13 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
|
||||
|
||||
// Write to all backends in parallel.
|
||||
let runtime = &self.runtime;
|
||||
// Propagate tracing context into rayon-spawned threads so that static file
|
||||
// and RocksDB write spans appear as children of save_blocks in traces.
|
||||
let span = tracing::Span::current();
|
||||
runtime.storage_pool().in_place_scope(|s| {
|
||||
// SF writes
|
||||
s.spawn(|_| {
|
||||
let _guard = span.enter();
|
||||
let start = Instant::now();
|
||||
sf_result = Some(
|
||||
sf_provider
|
||||
@@ -590,6 +594,7 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
|
||||
#[cfg(all(unix, feature = "rocksdb"))]
|
||||
if rocksdb_enabled {
|
||||
s.spawn(|_| {
|
||||
let _guard = span.enter();
|
||||
let start = Instant::now();
|
||||
rocksdb_result = Some(
|
||||
rocksdb_provider
|
||||
|
||||
@@ -1223,21 +1223,27 @@ impl RocksDBProvider {
|
||||
let write_account_history = ctx.storage_settings.storage_v2;
|
||||
let write_storage_history = ctx.storage_settings.storage_v2;
|
||||
|
||||
// Propagate tracing context into rayon-spawned threads so that RocksDB
|
||||
// write spans appear as children of write_blocks_data in traces.
|
||||
let span = tracing::Span::current();
|
||||
runtime.storage_pool().in_place_scope(|s| {
|
||||
if write_tx_hash {
|
||||
s.spawn(|_| {
|
||||
let _guard = span.enter();
|
||||
r_tx_hash = Some(self.write_tx_hash_numbers(blocks, tx_nums, &ctx));
|
||||
});
|
||||
}
|
||||
|
||||
if write_account_history {
|
||||
s.spawn(|_| {
|
||||
let _guard = span.enter();
|
||||
r_account_history = Some(self.write_account_history(blocks, &ctx));
|
||||
});
|
||||
}
|
||||
|
||||
if write_storage_history {
|
||||
s.spawn(|_| {
|
||||
let _guard = span.enter();
|
||||
r_storage_history = Some(self.write_storage_history(blocks, &ctx));
|
||||
});
|
||||
}
|
||||
|
||||
@@ -658,6 +658,7 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
///
|
||||
/// The closure receives a mutable reference to the segment writer. After the closure completes,
|
||||
/// `sync_all()` is called to flush writes to disk.
|
||||
#[instrument(level = "debug", target = "providers::static_file", skip_all, fields(?segment))]
|
||||
fn write_segment<F>(
|
||||
&self,
|
||||
segment: StaticFileSegment,
|
||||
@@ -697,8 +698,12 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
let mut r_account_changesets = None;
|
||||
let mut r_storage_changesets = None;
|
||||
|
||||
// Propagate tracing context into rayon-spawned threads so that per-segment
|
||||
// write spans appear as children of write_blocks_data in traces.
|
||||
let span = tracing::Span::current();
|
||||
runtime.storage_pool().in_place_scope(|s| {
|
||||
s.spawn(|_| {
|
||||
let _guard = span.enter();
|
||||
r_headers =
|
||||
Some(self.write_segment(StaticFileSegment::Headers, first_block_number, |w| {
|
||||
Self::write_headers(w, blocks)
|
||||
@@ -706,6 +711,7 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
});
|
||||
|
||||
s.spawn(|_| {
|
||||
let _guard = span.enter();
|
||||
r_txs = Some(self.write_segment(
|
||||
StaticFileSegment::Transactions,
|
||||
first_block_number,
|
||||
@@ -715,6 +721,7 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
|
||||
if ctx.write_senders {
|
||||
s.spawn(|_| {
|
||||
let _guard = span.enter();
|
||||
r_senders = Some(self.write_segment(
|
||||
StaticFileSegment::TransactionSenders,
|
||||
first_block_number,
|
||||
@@ -725,6 +732,7 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
|
||||
if ctx.write_receipts {
|
||||
s.spawn(|_| {
|
||||
let _guard = span.enter();
|
||||
r_receipts = Some(self.write_segment(
|
||||
StaticFileSegment::Receipts,
|
||||
first_block_number,
|
||||
@@ -735,6 +743,7 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
|
||||
if ctx.write_account_changesets {
|
||||
s.spawn(|_| {
|
||||
let _guard = span.enter();
|
||||
r_account_changesets = Some(self.write_segment(
|
||||
StaticFileSegment::AccountChangeSets,
|
||||
first_block_number,
|
||||
@@ -745,6 +754,7 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
|
||||
|
||||
if ctx.write_storage_changesets {
|
||||
s.spawn(|_| {
|
||||
let _guard = span.enter();
|
||||
r_storage_changesets = Some(self.write_segment(
|
||||
StaticFileSegment::StorageChangeSets,
|
||||
first_block_number,
|
||||
|
||||
Reference in New Issue
Block a user