From 37709c5a998a8844bf7655a439ec2bb39afb479b Mon Sep 17 00:00:00 2001 From: YK Date: Fri, 6 Mar 2026 19:46:20 +0900 Subject: [PATCH] feat(payload): propagate tracing span across payload builder channel (#22828) --- crates/payload/builder/src/noop.rs | 2 +- crates/payload/builder/src/service.rs | 57 ++++++++++++++++----------- 2 files changed, 36 insertions(+), 23 deletions(-) diff --git a/crates/payload/builder/src/noop.rs b/crates/payload/builder/src/noop.rs index 3628ef83c0..9e44d17bd8 100644 --- a/crates/payload/builder/src/noop.rs +++ b/crates/payload/builder/src/noop.rs @@ -45,7 +45,7 @@ where return Poll::Ready(()) }; match cmd { - PayloadServiceCommand::BuildNewPayload(attr, tx) => { + PayloadServiceCommand::BuildNewPayload(attr, _, tx) => { let id = attr.payload_id(); tx.send(Ok(id)).ok() } diff --git a/crates/payload/builder/src/service.rs b/crates/payload/builder/src/service.rs index 507b302651..61c6912cd4 100644 --- a/crates/payload/builder/src/service.rs +++ b/crates/payload/builder/src/service.rs @@ -28,7 +28,7 @@ use tokio::sync::{ watch, }; use tokio_stream::wrappers::UnboundedReceiverStream; -use tracing::{debug, info, trace, warn}; +use tracing::{debug, debug_span, info, trace, warn, Span}; type PayloadFuture

= Pin> + Send>>; @@ -131,7 +131,8 @@ impl PayloadBuilderHandle { attr: T::PayloadBuilderAttributes, ) -> Receiver> { let (tx, rx) = oneshot::channel(); - let _ = self.to_service.send(PayloadServiceCommand::BuildNewPayload(attr, tx)); + let job_span = debug_span!(parent: Span::current(), "payload_job"); + let _ = self.to_service.send(PayloadServiceCommand::BuildNewPayload(attr, job_span, tx)); rx } @@ -208,8 +209,10 @@ where { /// The type that knows how to create new payloads. generator: Gen, - /// All active payload jobs. - payload_jobs: Vec<(Gen::Job, PayloadId)>, + /// All active payload jobs, each accompanied by its id and the caller's tracing span + /// propagated across the channel so that poll and resolve work appears as children of the + /// original Engine API request. + payload_jobs: Vec<(Gen::Job, PayloadId, Span)>, /// Copy of the sender half, so new [`PayloadBuilderHandle`] can be created on demand. service_tx: mpsc::UnboundedSender>, /// Receiver half of the command channel. @@ -279,7 +282,7 @@ where /// Returns true if the given payload is currently being built. fn contains_payload(&self, id: PayloadId) -> bool { - self.payload_jobs.iter().any(|(_, job_id)| *job_id == id) + self.payload_jobs.iter().any(|(_, job_id, _)| *job_id == id) } /// Returns the best payload for the given identifier that has been built so far. @@ -287,8 +290,8 @@ where let res = self .payload_jobs .iter() - .find(|(_, job_id)| *job_id == id) - .map(|(j, _)| j.best_payload().map(|p| p.into())); + .find(|(_, job_id, _)| *job_id == id) + .map(|(j, _, _)| j.best_payload().map(|p| p.into())); if let Some(Ok(ref best)) = res { self.metrics.set_best_revenue(best.block().number(), f64::from(best.fees())); } @@ -311,12 +314,12 @@ where return Some(Box::pin(core::future::ready(Ok(payload.clone())))); } - let job = self.payload_jobs.iter().position(|(_, job_id)| *job_id == id)?; + let job = self.payload_jobs.iter().position(|(_, job_id, _)| *job_id == id)?; let (fut, keep_alive) = self.payload_jobs[job].0.resolve_kind(kind); let payload_timestamp = self.payload_jobs[job].0.payload_timestamp(); if keep_alive == KeepPayloadJobAlive::No { - let (_, id) = self.payload_jobs.swap_remove(job); + let (_, id, _) = self.payload_jobs.swap_remove(job); debug!(target: "payload_builder", %id, "terminated resolved job"); } @@ -365,8 +368,8 @@ where let timestamp = self .payload_jobs .iter() - .find(|(_, job_id)| *job_id == id) - .map(|(j, _)| j.payload_timestamp()); + .find(|(_, job_id, _)| *job_id == id) + .map(|(j, _, _)| j.payload_timestamp()); if timestamp.is_none() { trace!(target: "payload_builder", %id, "no matching payload job found to get timestamp for"); @@ -400,10 +403,14 @@ where // requests // we don't care about the order of the jobs, so we can just swap_remove them for idx in (0..this.payload_jobs.len()).rev() { - let (mut job, id) = this.payload_jobs.swap_remove(idx); + let (mut job, id, job_span) = this.payload_jobs.swap_remove(idx); - // drain better payloads from the job - match job.poll_unpin(cx) { + let poll_result = { + let _entered = job_span.enter(); + job.poll_unpin(cx) + }; + + match poll_result { Poll::Ready(Ok(_)) => { this.metrics.set_active_jobs(this.payload_jobs.len()); trace!(target: "payload_builder", %id, "payload job finished"); @@ -414,8 +421,7 @@ where this.metrics.set_active_jobs(this.payload_jobs.len()); } Poll::Pending => { - // still pending, put it back - this.payload_jobs.push((job, id)); + this.payload_jobs.push((job, id, job_span)); } } } @@ -426,21 +432,25 @@ where // drain all requests while let Poll::Ready(Some(cmd)) = this.command_rx.poll_next_unpin(cx) { match cmd { - PayloadServiceCommand::BuildNewPayload(attr, tx) => { + PayloadServiceCommand::BuildNewPayload(attr, job_span, tx) => { let id = attr.payload_id(); let mut res = Ok(id); if this.contains_payload(id) { debug!(target: "payload_builder",%id, parent = %attr.parent(), "Payload job already in progress, ignoring."); } else { - // no job for this payload yet, create one let parent = attr.parent(); - match this.generator.new_payload_job(attr.clone()) { + let job_result = { + let _entered = job_span.enter(); + this.generator.new_payload_job(attr.clone()) + }; + + match job_result { Ok(job) => { info!(target: "payload_builder", %id, %parent, "New payload job created"); this.metrics.inc_initiated_jobs(); new_job = true; - this.payload_jobs.push((job, id)); + this.payload_jobs.push((job, id, job_span)); this.payload_events.send(Events::Attributes(attr)).ok(); } Err(err) => { @@ -451,7 +461,6 @@ where } } - // return the id of the payload let _ = tx.send(res); } PayloadServiceCommand::BestPayload(id, tx) => { @@ -481,8 +490,12 @@ where /// Message type for the [`PayloadBuilderService`]. pub enum PayloadServiceCommand { /// Start building a new payload. + /// + /// Carries the caller's [`Span`] so the service can parent payload-building work under the + /// originating Engine API trace. BuildNewPayload( T::PayloadBuilderAttributes, + Span, oneshot::Sender>, ), /// Get the best payload so far @@ -505,7 +518,7 @@ where { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - Self::BuildNewPayload(f0, f1) => { + Self::BuildNewPayload(f0, _, f1) => { f.debug_tuple("BuildNewPayload").field(&f0).field(&f1).finish() } Self::BestPayload(f0, f1) => {