perf: spawn range query on blocking (#16434)

This commit is contained in:
Matthias Seitz
2025-05-23 13:04:19 +02:00
committed by GitHub
parent 22a69277b7
commit b76d4f6617

View File

@@ -35,7 +35,7 @@ use std::{
time::{Duration, Instant},
};
use tokio::{
sync::{mpsc::Receiver, Mutex},
sync::{mpsc::Receiver, oneshot, Mutex},
time::MissedTickBehavior,
};
use tracing::{error, trace};
@@ -51,7 +51,7 @@ where
limits: QueryLimits,
) -> impl Future<Output = RpcResult<Vec<Log>>> + Send {
trace!(target: "rpc::eth", "Serving eth_getLogs");
self.inner.logs_for_filter(filter, limits).map_err(|e| e.into())
self.logs_for_filter(filter, limits).map_err(|e| e.into())
}
}
@@ -169,7 +169,7 @@ where
impl<Eth> EthFilter<Eth>
where
Eth: FullEthApiTypes<Provider: BlockReader + BlockIdReader> + RpcNodeCoreExt,
Eth: FullEthApiTypes<Provider: BlockReader + BlockIdReader> + RpcNodeCoreExt + 'static,
{
/// Access the underlying provider.
fn provider(&self) -> &Eth::Provider {
@@ -244,8 +244,9 @@ where
};
let logs = self
.inner
.clone()
.get_logs_in_block_range(
&filter,
*filter,
from_block_number,
to_block_number,
self.inner.query_limits,
@@ -274,7 +275,16 @@ where
}
};
self.inner.logs_for_filter(filter, self.inner.query_limits).await
self.logs_for_filter(filter, self.inner.query_limits).await
}
/// Returns logs matching given filter object.
async fn logs_for_filter(
&self,
filter: Filter,
limits: QueryLimits,
) -> Result<Vec<Log>, EthFilterError> {
self.inner.clone().logs_for_filter(filter, limits).await
}
}
@@ -364,7 +374,7 @@ where
/// Handler for `eth_getLogs`
async fn logs(&self, filter: Filter) -> RpcResult<Vec<Log>> {
trace!(target: "rpc::eth", "Serving eth_getLogs");
Ok(self.inner.logs_for_filter(filter, self.inner.query_limits).await?)
Ok(self.logs_for_filter(filter, self.inner.query_limits).await?)
}
}
@@ -398,7 +408,7 @@ struct EthFilterInner<Eth: EthApiTypes> {
impl<Eth> EthFilterInner<Eth>
where
Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes,
Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
{
/// Access the underlying provider.
fn provider(&self) -> &Eth::Provider {
@@ -414,7 +424,7 @@ where
/// Returns logs matching given filter object.
async fn logs_for_filter(
&self,
self: Arc<Self>,
filter: Filter,
limits: QueryLimits,
) -> Result<Vec<Log>, EthFilterError> {
@@ -468,7 +478,7 @@ where
.flatten();
let (from_block_number, to_block_number) =
logs_utils::get_filter_block_range(from, to, start_block, info);
self.get_logs_in_block_range(&filter, from_block_number, to_block_number, limits)
self.get_logs_in_block_range(filter, from_block_number, to_block_number, limits)
.await
}
}
@@ -504,14 +514,15 @@ where
/// - underlying database error
/// - amount of matches exceeds configured limit
async fn get_logs_in_block_range(
&self,
filter: &Filter,
self: Arc<Self>,
filter: Filter,
from_block: u64,
to_block: u64,
limits: QueryLimits,
) -> Result<Vec<Log>, EthFilterError> {
trace!(target: "rpc::eth::filter", from=from_block, to=to_block, ?filter, "finding logs in range");
// perform boundary checks first
if to_block < from_block {
return Err(EthFilterError::InvalidBlockRangeParams)
}
@@ -522,6 +533,32 @@ where
return Err(EthFilterError::QueryExceedsMaxBlocks(max_blocks_per_filter))
}
let (tx, rx) = oneshot::channel();
let this = self.clone();
self.task_spawner.spawn_blocking(Box::pin(async move {
let res =
this.get_logs_in_block_range_inner(&filter, from_block, to_block, limits).await;
let _ = tx.send(res);
}));
rx.await.map_err(|_| EthFilterError::InternalError)?
}
/// Returns all logs in the given _inclusive_ range that match the filter
///
/// Note: This function uses a mix of blocking db operations for fetching indices and header
/// ranges and utilizes the rpc cache for optimistically fetching receipts and blocks.
/// This function is considered blocking and should thus be spawned on a blocking task.
///
/// Returns an error if:
/// - underlying database error
async fn get_logs_in_block_range_inner(
&self,
filter: &Filter,
from_block: u64,
to_block: u64,
limits: QueryLimits,
) -> Result<Vec<Log>, EthFilterError> {
let mut all_logs = Vec::new();
// loop over the range of new blocks and check logs if the filter matches the log's bloom