fix: spawn block fetching blocking (#19491)

Co-authored-by: Alexey Shekhirin <5773434+shekhirin@users.noreply.github.com>
This commit is contained in:
Matthias Seitz
2025-11-04 18:52:09 +01:00
committed by GitHub
parent 29761637ef
commit fdcc540492

View File

@@ -363,7 +363,7 @@ where
) -> Result<Vec<LocalizedTransactionTrace>, Eth::Error> {
// We'll reuse the matcher across multiple blocks that are traced in parallel
let matcher = Arc::new(filter.matcher());
let TraceFilter { from_block, to_block, after, count, .. } = filter;
let TraceFilter { from_block, to_block, mut after, count, .. } = filter;
let start = from_block.unwrap_or(0);
let latest_block = self.provider().best_block_number().map_err(Eth::Error::from_eth_err)?;
@@ -389,81 +389,98 @@ where
.into())
}
// fetch all blocks in that range
let blocks = self
.provider()
.recovered_block_range(start..=end)
.map_err(Eth::Error::from_eth_err)?
.into_iter()
.map(Arc::new)
.collect::<Vec<_>>();
let mut all_traces = Vec::new();
let mut block_traces = Vec::with_capacity(self.inner.eth_config.max_tracing_requests);
for chunk_start in (start..end).step_by(self.inner.eth_config.max_tracing_requests) {
let chunk_end =
std::cmp::min(chunk_start + self.inner.eth_config.max_tracing_requests as u64, end);
// trace all blocks
let mut block_traces = Vec::with_capacity(blocks.len());
for block in &blocks {
let matcher = matcher.clone();
let traces = self.eth_api().trace_block_until(
block.hash().into(),
Some(block.clone()),
None,
TracingInspectorConfig::default_parity(),
move |tx_info, mut ctx| {
let mut traces = ctx
.take_inspector()
.into_parity_builder()
.into_localized_transaction_traces(tx_info);
traces.retain(|trace| matcher.matches(&trace.trace));
Ok(Some(traces))
},
);
block_traces.push(traces);
}
// fetch all blocks in that chunk
let blocks = self
.eth_api()
.spawn_blocking_io(move |this| {
Ok(this
.provider()
.recovered_block_range(chunk_start..=chunk_end)
.map_err(Eth::Error::from_eth_err)?
.into_iter()
.map(Arc::new)
.collect::<Vec<_>>())
})
.await?;
let block_traces = futures::future::try_join_all(block_traces).await?;
let mut all_traces = block_traces
.into_iter()
.flatten()
.flat_map(|traces| traces.into_iter().flatten().flat_map(|traces| traces.into_iter()))
.collect::<Vec<_>>();
// add reward traces for all blocks
for block in &blocks {
if let Some(base_block_reward) = self.calculate_base_block_reward(block.header())? {
all_traces.extend(
self.extract_reward_traces(
block.header(),
block.body().ommers(),
base_block_reward,
)
.into_iter()
.filter(|trace| matcher.matches(&trace.trace)),
// trace all blocks
for block in &blocks {
let matcher = matcher.clone();
let traces = self.eth_api().trace_block_until(
block.hash().into(),
Some(block.clone()),
None,
TracingInspectorConfig::default_parity(),
move |tx_info, mut ctx| {
let mut traces = ctx
.take_inspector()
.into_parity_builder()
.into_localized_transaction_traces(tx_info);
traces.retain(|trace| matcher.matches(&trace.trace));
Ok(Some(traces))
},
);
} else {
// no block reward, means we're past the Paris hardfork and don't expect any rewards
// because the blocks in ascending order
break
block_traces.push(traces);
}
#[allow(clippy::iter_with_drain)]
let block_traces = futures::future::try_join_all(block_traces.drain(..)).await?;
all_traces.extend(block_traces.into_iter().flatten().flat_map(|traces| {
traces.into_iter().flatten().flat_map(|traces| traces.into_iter())
}));
// add reward traces for all blocks
for block in &blocks {
if let Some(base_block_reward) = self.calculate_base_block_reward(block.header())? {
all_traces.extend(
self.extract_reward_traces(
block.header(),
block.body().ommers(),
base_block_reward,
)
.into_iter()
.filter(|trace| matcher.matches(&trace.trace)),
);
} else {
// no block reward, means we're past the Paris hardfork and don't expect any
// rewards because the blocks in ascending order
break
}
}
// Skips the first `after` number of matching traces.
if let Some(cutoff) = after.map(|a| a as usize) &&
cutoff < all_traces.len()
{
all_traces.drain(..cutoff);
// we removed the first `after` traces
after = None;
}
// Return at most `count` of traces
if let Some(count) = count {
let count = count as usize;
if count < all_traces.len() {
all_traces.truncate(count);
return Ok(all_traces)
}
};
}
// Skips the first `after` number of matching traces.
// If `after` is greater than or equal to the number of matched traces, it returns an empty
// array.
if let Some(after) = after.map(|a| a as usize) {
if after < all_traces.len() {
all_traces.drain(..after);
} else {
return Ok(vec![])
}
// If `after` is greater than or equal to the number of matched traces, it returns an
// empty array.
if let Some(cutoff) = after.map(|a| a as usize) &&
cutoff >= all_traces.len()
{
return Ok(vec![])
}
// Return at most `count` of traces
if let Some(count) = count {
let count = count as usize;
if count < all_traces.len() {
all_traces.truncate(count);
}
};
Ok(all_traces)
}
@@ -692,6 +709,7 @@ where
/// # Limitations
/// This currently requires block filter fields, since reth does not have address indices yet.
async fn trace_filter(&self, filter: TraceFilter) -> RpcResult<Vec<LocalizedTransactionTrace>> {
let _permit = self.inner.blocking_task_guard.clone().acquire_many_owned(2).await;
Ok(Self::trace_filter(self, filter).await.map_err(Into::into)?)
}