mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
feat(payload): propagate tracing span across payload builder channel (#22828)
This commit is contained in:
@@ -45,7 +45,7 @@ where
|
||||
return Poll::Ready(())
|
||||
};
|
||||
match cmd {
|
||||
PayloadServiceCommand::BuildNewPayload(attr, tx) => {
|
||||
PayloadServiceCommand::BuildNewPayload(attr, _, tx) => {
|
||||
let id = attr.payload_id();
|
||||
tx.send(Ok(id)).ok()
|
||||
}
|
||||
|
||||
@@ -28,7 +28,7 @@ use tokio::sync::{
|
||||
watch,
|
||||
};
|
||||
use tokio_stream::wrappers::UnboundedReceiverStream;
|
||||
use tracing::{debug, info, trace, warn};
|
||||
use tracing::{debug, debug_span, info, trace, warn, Span};
|
||||
|
||||
type PayloadFuture<P> = Pin<Box<dyn Future<Output = Result<P, PayloadBuilderError>> + Send>>;
|
||||
|
||||
@@ -131,7 +131,8 @@ impl<T: PayloadTypes> PayloadBuilderHandle<T> {
|
||||
attr: T::PayloadBuilderAttributes,
|
||||
) -> Receiver<Result<PayloadId, PayloadBuilderError>> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let _ = self.to_service.send(PayloadServiceCommand::BuildNewPayload(attr, tx));
|
||||
let job_span = debug_span!(parent: Span::current(), "payload_job");
|
||||
let _ = self.to_service.send(PayloadServiceCommand::BuildNewPayload(attr, job_span, tx));
|
||||
rx
|
||||
}
|
||||
|
||||
@@ -208,8 +209,10 @@ where
|
||||
{
|
||||
/// The type that knows how to create new payloads.
|
||||
generator: Gen,
|
||||
/// All active payload jobs.
|
||||
payload_jobs: Vec<(Gen::Job, PayloadId)>,
|
||||
/// All active payload jobs, each accompanied by its id and the caller's tracing span
|
||||
/// propagated across the channel so that poll and resolve work appears as children of the
|
||||
/// original Engine API request.
|
||||
payload_jobs: Vec<(Gen::Job, PayloadId, Span)>,
|
||||
/// Copy of the sender half, so new [`PayloadBuilderHandle`] can be created on demand.
|
||||
service_tx: mpsc::UnboundedSender<PayloadServiceCommand<T>>,
|
||||
/// Receiver half of the command channel.
|
||||
@@ -279,7 +282,7 @@ where
|
||||
|
||||
/// Returns true if the given payload is currently being built.
|
||||
fn contains_payload(&self, id: PayloadId) -> bool {
|
||||
self.payload_jobs.iter().any(|(_, job_id)| *job_id == id)
|
||||
self.payload_jobs.iter().any(|(_, job_id, _)| *job_id == id)
|
||||
}
|
||||
|
||||
/// Returns the best payload for the given identifier that has been built so far.
|
||||
@@ -287,8 +290,8 @@ where
|
||||
let res = self
|
||||
.payload_jobs
|
||||
.iter()
|
||||
.find(|(_, job_id)| *job_id == id)
|
||||
.map(|(j, _)| j.best_payload().map(|p| p.into()));
|
||||
.find(|(_, job_id, _)| *job_id == id)
|
||||
.map(|(j, _, _)| j.best_payload().map(|p| p.into()));
|
||||
if let Some(Ok(ref best)) = res {
|
||||
self.metrics.set_best_revenue(best.block().number(), f64::from(best.fees()));
|
||||
}
|
||||
@@ -311,12 +314,12 @@ where
|
||||
return Some(Box::pin(core::future::ready(Ok(payload.clone()))));
|
||||
}
|
||||
|
||||
let job = self.payload_jobs.iter().position(|(_, job_id)| *job_id == id)?;
|
||||
let job = self.payload_jobs.iter().position(|(_, job_id, _)| *job_id == id)?;
|
||||
let (fut, keep_alive) = self.payload_jobs[job].0.resolve_kind(kind);
|
||||
let payload_timestamp = self.payload_jobs[job].0.payload_timestamp();
|
||||
|
||||
if keep_alive == KeepPayloadJobAlive::No {
|
||||
let (_, id) = self.payload_jobs.swap_remove(job);
|
||||
let (_, id, _) = self.payload_jobs.swap_remove(job);
|
||||
debug!(target: "payload_builder", %id, "terminated resolved job");
|
||||
}
|
||||
|
||||
@@ -365,8 +368,8 @@ where
|
||||
let timestamp = self
|
||||
.payload_jobs
|
||||
.iter()
|
||||
.find(|(_, job_id)| *job_id == id)
|
||||
.map(|(j, _)| j.payload_timestamp());
|
||||
.find(|(_, job_id, _)| *job_id == id)
|
||||
.map(|(j, _, _)| j.payload_timestamp());
|
||||
|
||||
if timestamp.is_none() {
|
||||
trace!(target: "payload_builder", %id, "no matching payload job found to get timestamp for");
|
||||
@@ -400,10 +403,14 @@ where
|
||||
// requests
|
||||
// we don't care about the order of the jobs, so we can just swap_remove them
|
||||
for idx in (0..this.payload_jobs.len()).rev() {
|
||||
let (mut job, id) = this.payload_jobs.swap_remove(idx);
|
||||
let (mut job, id, job_span) = this.payload_jobs.swap_remove(idx);
|
||||
|
||||
// drain better payloads from the job
|
||||
match job.poll_unpin(cx) {
|
||||
let poll_result = {
|
||||
let _entered = job_span.enter();
|
||||
job.poll_unpin(cx)
|
||||
};
|
||||
|
||||
match poll_result {
|
||||
Poll::Ready(Ok(_)) => {
|
||||
this.metrics.set_active_jobs(this.payload_jobs.len());
|
||||
trace!(target: "payload_builder", %id, "payload job finished");
|
||||
@@ -414,8 +421,7 @@ where
|
||||
this.metrics.set_active_jobs(this.payload_jobs.len());
|
||||
}
|
||||
Poll::Pending => {
|
||||
// still pending, put it back
|
||||
this.payload_jobs.push((job, id));
|
||||
this.payload_jobs.push((job, id, job_span));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -426,21 +432,25 @@ where
|
||||
// drain all requests
|
||||
while let Poll::Ready(Some(cmd)) = this.command_rx.poll_next_unpin(cx) {
|
||||
match cmd {
|
||||
PayloadServiceCommand::BuildNewPayload(attr, tx) => {
|
||||
PayloadServiceCommand::BuildNewPayload(attr, job_span, tx) => {
|
||||
let id = attr.payload_id();
|
||||
let mut res = Ok(id);
|
||||
|
||||
if this.contains_payload(id) {
|
||||
debug!(target: "payload_builder",%id, parent = %attr.parent(), "Payload job already in progress, ignoring.");
|
||||
} else {
|
||||
// no job for this payload yet, create one
|
||||
let parent = attr.parent();
|
||||
match this.generator.new_payload_job(attr.clone()) {
|
||||
let job_result = {
|
||||
let _entered = job_span.enter();
|
||||
this.generator.new_payload_job(attr.clone())
|
||||
};
|
||||
|
||||
match job_result {
|
||||
Ok(job) => {
|
||||
info!(target: "payload_builder", %id, %parent, "New payload job created");
|
||||
this.metrics.inc_initiated_jobs();
|
||||
new_job = true;
|
||||
this.payload_jobs.push((job, id));
|
||||
this.payload_jobs.push((job, id, job_span));
|
||||
this.payload_events.send(Events::Attributes(attr)).ok();
|
||||
}
|
||||
Err(err) => {
|
||||
@@ -451,7 +461,6 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
// return the id of the payload
|
||||
let _ = tx.send(res);
|
||||
}
|
||||
PayloadServiceCommand::BestPayload(id, tx) => {
|
||||
@@ -481,8 +490,12 @@ where
|
||||
/// Message type for the [`PayloadBuilderService`].
|
||||
pub enum PayloadServiceCommand<T: PayloadTypes> {
|
||||
/// Start building a new payload.
|
||||
///
|
||||
/// Carries the caller's [`Span`] so the service can parent payload-building work under the
|
||||
/// originating Engine API trace.
|
||||
BuildNewPayload(
|
||||
T::PayloadBuilderAttributes,
|
||||
Span,
|
||||
oneshot::Sender<Result<PayloadId, PayloadBuilderError>>,
|
||||
),
|
||||
/// Get the best payload so far
|
||||
@@ -505,7 +518,7 @@ where
|
||||
{
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
Self::BuildNewPayload(f0, f1) => {
|
||||
Self::BuildNewPayload(f0, _, f1) => {
|
||||
f.debug_tuple("BuildNewPayload").field(&f0).field(&f1).finish()
|
||||
}
|
||||
Self::BestPayload(f0, f1) => {
|
||||
|
||||
Reference in New Issue
Block a user