mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-04-30 03:01:58 -04:00
Compare commits
2 Commits
performanc
...
klkvr/batc
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1433fd806b | ||
|
|
4d074341e9 |
@@ -1218,9 +1218,7 @@ impl ReverseHeadersDownloaderBuilder {
|
||||
next_request_block_number: 0,
|
||||
next_chain_tip_block_number: 0,
|
||||
lowest_validated_header: None,
|
||||
// TODO(mattsse): tmp hotfix to prevent issues with syncing from besu which has an upper
|
||||
// limit of 512
|
||||
request_limit: request_limit.min(512),
|
||||
request_limit,
|
||||
min_concurrent_requests,
|
||||
max_concurrent_requests,
|
||||
stream_batch_size,
|
||||
|
||||
@@ -995,14 +995,15 @@ where
|
||||
.additional_validation_tasks
|
||||
.unwrap_or_else(|| ctx.config().txpool.additional_validation_tasks),
|
||||
)
|
||||
.build_with_tasks(ctx.task_executor().clone(), blob_store.clone())
|
||||
.into_tasks_builder(blob_store.clone())
|
||||
.map(|validator| {
|
||||
OpTransactionValidator::new(validator)
|
||||
// In --dev mode we can't require gas fees because we're unable to decode
|
||||
// the L1 block info
|
||||
.require_l1_data_gas_fee(!ctx.config().dev.dev)
|
||||
.with_supervisor(supervisor_client.clone())
|
||||
});
|
||||
})
|
||||
.build_and_spawn(ctx.task_executor().clone());
|
||||
|
||||
let final_pool_config = pool_config_overrides.apply(ctx.pool_config());
|
||||
|
||||
|
||||
@@ -36,7 +36,6 @@ futures-util.workspace = true
|
||||
parking_lot.workspace = true
|
||||
pin-project.workspace = true
|
||||
tokio = { workspace = true, features = ["sync"] }
|
||||
tokio-stream.workspace = true
|
||||
|
||||
# metrics
|
||||
reth-metrics.workspace = true
|
||||
@@ -74,6 +73,7 @@ tempfile.workspace = true
|
||||
serde_json.workspace = true
|
||||
tokio = { workspace = true, features = ["rt-multi-thread"] }
|
||||
futures.workspace = true
|
||||
tokio-stream.workspace = true
|
||||
|
||||
[features]
|
||||
serde = [
|
||||
|
||||
@@ -148,7 +148,7 @@ pub struct TxPoolValidationMetrics {
|
||||
}
|
||||
|
||||
/// Transaction pool validator task metrics
|
||||
#[derive(Metrics)]
|
||||
#[derive(Metrics, Clone)]
|
||||
#[metrics(scope = "transaction_pool")]
|
||||
pub struct TxPoolValidatorMetrics {
|
||||
/// Number of in-flight validation job sends waiting for channel capacity
|
||||
|
||||
@@ -8,10 +8,12 @@ use crate::{
|
||||
},
|
||||
metrics::TxPoolValidationMetrics,
|
||||
traits::TransactionOrigin,
|
||||
validate::{ValidTransaction, ValidationTask, MAX_INIT_CODE_BYTE_SIZE},
|
||||
validate::{
|
||||
task::TransactionValidationTaskExecutorBuilder, ValidTransaction, MAX_INIT_CODE_BYTE_SIZE,
|
||||
},
|
||||
Address, BlobTransactionSidecarVariant, EthBlobTransactionSidecar, EthPoolTransaction,
|
||||
LocalTransactionConfig, TransactionValidationOutcome, TransactionValidationTaskExecutor,
|
||||
TransactionValidator,
|
||||
LocalTransactionConfig, PoolTransaction, TransactionValidationOutcome,
|
||||
TransactionValidationTaskExecutor, TransactionValidator,
|
||||
};
|
||||
|
||||
use alloy_consensus::{
|
||||
@@ -41,7 +43,6 @@ use std::{
|
||||
},
|
||||
time::{Instant, SystemTime},
|
||||
};
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
/// A [`TransactionValidator`] implementation that validates ethereum transaction.
|
||||
///
|
||||
@@ -1164,6 +1165,19 @@ impl<Client> EthTransactionValidatorBuilder<Client> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a [`TransactionValidationTaskExecutorBuilder`].
|
||||
pub fn into_tasks_builder<Tx, S>(
|
||||
self,
|
||||
blob_store: S,
|
||||
) -> TransactionValidationTaskExecutorBuilder<EthTransactionValidator<Client, Tx>>
|
||||
where
|
||||
S: BlobStore,
|
||||
{
|
||||
let additional_tasks = self.additional_tasks;
|
||||
let validator = self.build(blob_store);
|
||||
TransactionValidationTaskExecutorBuilder::new(validator, additional_tasks)
|
||||
}
|
||||
|
||||
/// Builds a [`EthTransactionValidator`] and spawns validation tasks via the
|
||||
/// [`TransactionValidationTaskExecutor`]
|
||||
///
|
||||
@@ -1178,32 +1192,10 @@ impl<Client> EthTransactionValidatorBuilder<Client> {
|
||||
where
|
||||
T: TaskSpawner,
|
||||
S: BlobStore,
|
||||
Tx: PoolTransaction,
|
||||
EthTransactionValidator<Client, Tx>: TransactionValidator<Transaction = Tx> + 'static,
|
||||
{
|
||||
let additional_tasks = self.additional_tasks;
|
||||
let validator = self.build(blob_store);
|
||||
|
||||
let (tx, task) = ValidationTask::new();
|
||||
|
||||
// Spawn validation tasks, they are blocking because they perform db lookups
|
||||
for _ in 0..additional_tasks {
|
||||
let task = task.clone();
|
||||
tasks.spawn_blocking(Box::pin(async move {
|
||||
task.run().await;
|
||||
}));
|
||||
}
|
||||
|
||||
// we spawn them on critical tasks because validation, especially for EIP-4844 can be quite
|
||||
// heavy
|
||||
tasks.spawn_critical_blocking(
|
||||
"transaction-validation-service",
|
||||
Box::pin(async move {
|
||||
task.run().await;
|
||||
}),
|
||||
);
|
||||
|
||||
let to_validation_task = Arc::new(Mutex::new(tx));
|
||||
|
||||
TransactionValidationTaskExecutor { validator: Arc::new(validator), to_validation_task }
|
||||
self.into_tasks_builder(blob_store).build_and_spawn(tasks)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -4,142 +4,142 @@ use crate::{
|
||||
blobstore::BlobStore,
|
||||
metrics::TxPoolValidatorMetrics,
|
||||
validate::{EthTransactionValidatorBuilder, TransactionValidatorError},
|
||||
EthTransactionValidator, PoolTransaction, TransactionOrigin, TransactionValidationOutcome,
|
||||
TransactionValidator,
|
||||
EthPoolTransaction, EthPooledTransaction, EthTransactionValidator, PoolTransaction,
|
||||
TransactionOrigin, TransactionValidationOutcome, TransactionValidator,
|
||||
};
|
||||
use futures_util::{lock::Mutex, StreamExt};
|
||||
use futures_util::lock::Mutex;
|
||||
use reth_chainspec::{ChainSpecProvider, EthereumHardforks};
|
||||
use reth_primitives_traits::{Block, SealedBlock};
|
||||
use reth_storage_api::{noop::NoopProvider, StateProviderFactory};
|
||||
use reth_tasks::TaskSpawner;
|
||||
use std::{future::Future, pin::Pin, sync::Arc};
|
||||
use tokio::{
|
||||
sync,
|
||||
sync::{mpsc, oneshot},
|
||||
};
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
|
||||
/// Represents a future outputting unit type and is sendable.
|
||||
type ValidationFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
|
||||
|
||||
/// Represents a stream of validation futures.
|
||||
type ValidationStream = ReceiverStream<ValidationFuture>;
|
||||
|
||||
/// A service that performs validation jobs.
|
||||
/// A validation job for a single transaction.
|
||||
///
|
||||
/// This listens for incoming validation jobs and executes them.
|
||||
/// Contains the transaction to validate, its origin, and a channel to send the result back.
|
||||
type ValidationJob<T> = (TransactionOrigin, T, oneshot::Sender<TransactionValidationOutcome<T>>);
|
||||
|
||||
/// A service that performs transaction validation jobs.
|
||||
///
|
||||
/// This listens for incoming transactions, batches them using `recv_many`,
|
||||
/// and validates them in batches for improved performance.
|
||||
///
|
||||
/// This should be spawned as a task: [`ValidationTask::run`]
|
||||
#[derive(Clone)]
|
||||
pub struct ValidationTask {
|
||||
validation_jobs: Arc<Mutex<ValidationStream>>,
|
||||
#[derive(Debug)]
|
||||
pub struct ValidationTask<V: TransactionValidator> {
|
||||
/// Shared receiver for validation jobs - multiple tasks can compete for work.
|
||||
validation_jobs: Arc<Mutex<mpsc::UnboundedReceiver<ValidationJob<V::Transaction>>>>,
|
||||
/// The validator used to validate transactions.
|
||||
validator: Arc<V>,
|
||||
/// Metrics for the validation task.
|
||||
metrics: TxPoolValidatorMetrics,
|
||||
}
|
||||
|
||||
impl ValidationTask {
|
||||
/// Creates a new cloneable task pair.
|
||||
///
|
||||
/// The sender sends new (transaction) validation tasks to an available validation task.
|
||||
pub fn new() -> (ValidationJobSender, Self) {
|
||||
Self::with_capacity(1)
|
||||
}
|
||||
|
||||
/// Creates a new cloneable task pair with the given channel capacity.
|
||||
pub fn with_capacity(capacity: usize) -> (ValidationJobSender, Self) {
|
||||
let (tx, rx) = mpsc::channel(capacity);
|
||||
let metrics = TxPoolValidatorMetrics::default();
|
||||
(ValidationJobSender { tx, metrics }, Self::with_receiver(rx))
|
||||
}
|
||||
|
||||
/// Creates a new task with the given receiver.
|
||||
pub fn with_receiver(jobs: mpsc::Receiver<Pin<Box<dyn Future<Output = ()> + Send>>>) -> Self {
|
||||
Self { validation_jobs: Arc::new(Mutex::new(ReceiverStream::new(jobs))) }
|
||||
}
|
||||
|
||||
/// Executes all new validation jobs that come in.
|
||||
///
|
||||
/// This will run as long as the channel is alive and is expected to be spawned as a task.
|
||||
pub async fn run(self) {
|
||||
while let Some(task) = self.validation_jobs.lock().await.next().await {
|
||||
task.await;
|
||||
impl<V: TransactionValidator> Clone for ValidationTask<V> {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
validator: self.validator.clone(),
|
||||
validation_jobs: self.validation_jobs.clone(),
|
||||
metrics: self.metrics.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for ValidationTask {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("ValidationTask").field("validation_jobs", &"...").finish()
|
||||
impl<V: TransactionValidator> ValidationTask<V> {
|
||||
/// Maximum number of transactions to batch in a single validation call.
|
||||
const BATCH_SIZE: usize = 64;
|
||||
|
||||
/// Creates a new cloneable task pair.
|
||||
///
|
||||
/// The sender sends new (transaction) validation tasks to an available validation task.
|
||||
pub fn new(
|
||||
validator: Arc<V>,
|
||||
metrics: TxPoolValidatorMetrics,
|
||||
) -> (mpsc::UnboundedSender<ValidationJob<V::Transaction>>, Self) {
|
||||
let (tx, rx) = mpsc::unbounded_channel();
|
||||
(tx, Self { validator, validation_jobs: Arc::new(Mutex::new(rx)), metrics })
|
||||
}
|
||||
}
|
||||
|
||||
/// A sender new type for sending validation jobs to [`ValidationTask`].
|
||||
#[derive(Debug)]
|
||||
pub struct ValidationJobSender {
|
||||
tx: mpsc::Sender<Pin<Box<dyn Future<Output = ()> + Send>>>,
|
||||
metrics: TxPoolValidatorMetrics,
|
||||
}
|
||||
/// Executes validation jobs, batching transactions for efficiency.
|
||||
///
|
||||
/// This will run as long as the channel is alive and is expected to be spawned as a task.
|
||||
pub async fn run(self) {
|
||||
let mut buffer = Vec::with_capacity(Self::BATCH_SIZE);
|
||||
|
||||
impl ValidationJobSender {
|
||||
/// Sends the given job to the validation task.
|
||||
pub async fn send(
|
||||
&self,
|
||||
job: Pin<Box<dyn Future<Output = ()> + Send>>,
|
||||
) -> Result<(), TransactionValidatorError> {
|
||||
self.metrics.inflight_validation_jobs.increment(1);
|
||||
let res = self
|
||||
.tx
|
||||
.send(job)
|
||||
.await
|
||||
.map_err(|_| TransactionValidatorError::ValidationServiceUnreachable);
|
||||
self.metrics.inflight_validation_jobs.decrement(1);
|
||||
res
|
||||
loop {
|
||||
// Lock the receiver and batch receive transactions
|
||||
let count =
|
||||
self.validation_jobs.lock().await.recv_many(&mut buffer, Self::BATCH_SIZE).await;
|
||||
|
||||
if count == 0 {
|
||||
// Channel closed, exit
|
||||
break;
|
||||
}
|
||||
|
||||
self.metrics.inflight_validation_jobs.decrement(count as f64);
|
||||
|
||||
// Split into transactions and response senders
|
||||
#[expect(clippy::iter_with_drain)]
|
||||
let (txs, senders): (Vec<_>, Vec<_>) =
|
||||
buffer.drain(..).map(|(origin, tx, sender)| ((origin, tx), sender)).unzip();
|
||||
|
||||
// Batch validate all transactions
|
||||
let results = self.validator.validate_transactions(txs).await;
|
||||
|
||||
// Send results back through oneshot channels
|
||||
for (result, sender) in results.into_iter().zip(senders) {
|
||||
let _ = sender.send(result);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A [`TransactionValidator`] implementation that validates ethereum transaction.
|
||||
/// This validator is non-blocking, all validation work is done in a separate task.
|
||||
#[derive(Debug)]
|
||||
pub struct TransactionValidationTaskExecutor<V> {
|
||||
pub struct TransactionValidationTaskExecutor<V: TransactionValidator> {
|
||||
/// The validator that will validate transactions on a separate task.
|
||||
pub validator: Arc<V>,
|
||||
/// The sender half to validation tasks that perform the actual validation.
|
||||
pub to_validation_task: Arc<sync::Mutex<ValidationJobSender>>,
|
||||
pub to_validation_task: mpsc::UnboundedSender<ValidationJob<V::Transaction>>,
|
||||
/// Metrics for the validator task executor.
|
||||
pub metrics: TxPoolValidatorMetrics,
|
||||
}
|
||||
|
||||
impl<V> Clone for TransactionValidationTaskExecutor<V> {
|
||||
impl<V: TransactionValidator> Clone for TransactionValidationTaskExecutor<V> {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
validator: self.validator.clone(),
|
||||
to_validation_task: self.to_validation_task.clone(),
|
||||
metrics: self.metrics.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// === impl TransactionValidationTaskExecutor ===
|
||||
|
||||
impl TransactionValidationTaskExecutor<()> {
|
||||
impl
|
||||
TransactionValidationTaskExecutor<EthTransactionValidator<NoopProvider, EthPooledTransaction>>
|
||||
{
|
||||
/// Convenience method to create a [`EthTransactionValidatorBuilder`]
|
||||
pub fn eth_builder<Client>(client: Client) -> EthTransactionValidatorBuilder<Client> {
|
||||
EthTransactionValidatorBuilder::new(client)
|
||||
}
|
||||
}
|
||||
|
||||
impl<V> TransactionValidationTaskExecutor<V> {
|
||||
/// Maps the given validator to a new type.
|
||||
pub fn map<F, T>(self, mut f: F) -> TransactionValidationTaskExecutor<T>
|
||||
where
|
||||
F: FnMut(V) -> T,
|
||||
{
|
||||
TransactionValidationTaskExecutor {
|
||||
validator: Arc::new(f(Arc::into_inner(self.validator).unwrap())),
|
||||
to_validation_task: self.to_validation_task,
|
||||
}
|
||||
}
|
||||
|
||||
impl<V: TransactionValidator> TransactionValidationTaskExecutor<V> {
|
||||
/// Returns the validator.
|
||||
pub fn validator(&self) -> &V {
|
||||
&self.validator
|
||||
}
|
||||
}
|
||||
|
||||
impl<Client, Tx> TransactionValidationTaskExecutor<EthTransactionValidator<Client, Tx>> {
|
||||
impl<Client, Tx> TransactionValidationTaskExecutor<EthTransactionValidator<Client, Tx>>
|
||||
where
|
||||
Client: ChainSpecProvider<ChainSpec: EthereumHardforks> + StateProviderFactory + 'static,
|
||||
Tx: EthPoolTransaction,
|
||||
{
|
||||
/// Creates a new instance for the given client
|
||||
///
|
||||
/// This will spawn a single validation tasks that performs the actual validation.
|
||||
@@ -175,20 +175,16 @@ impl<Client, Tx> TransactionValidationTaskExecutor<EthTransactionValidator<Clien
|
||||
}
|
||||
}
|
||||
|
||||
impl<V> TransactionValidationTaskExecutor<V> {
|
||||
impl<V: TransactionValidator> TransactionValidationTaskExecutor<V> {
|
||||
/// Creates a new executor instance with the given validator for transaction validation.
|
||||
///
|
||||
/// Initializes the executor with the provided validator and sets up communication for
|
||||
/// validation tasks.
|
||||
pub fn new(validator: V) -> (Self, ValidationTask) {
|
||||
let (tx, task) = ValidationTask::new();
|
||||
(
|
||||
Self {
|
||||
validator: Arc::new(validator),
|
||||
to_validation_task: Arc::new(sync::Mutex::new(tx)),
|
||||
},
|
||||
task,
|
||||
)
|
||||
pub fn new(validator: V) -> (Self, ValidationTask<V>) {
|
||||
let validator = Arc::new(validator);
|
||||
let metrics = TxPoolValidatorMetrics::default();
|
||||
let (to_validation_task, task) = ValidationTask::new(validator.clone(), metrics.clone());
|
||||
(Self { validator, to_validation_task, metrics }, task)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -196,7 +192,7 @@ impl<V> TransactionValidator for TransactionValidationTaskExecutor<V>
|
||||
where
|
||||
V: TransactionValidator + 'static,
|
||||
{
|
||||
type Transaction = <V as TransactionValidator>::Transaction;
|
||||
type Transaction = V::Transaction;
|
||||
|
||||
async fn validate_transaction(
|
||||
&self,
|
||||
@@ -205,75 +201,72 @@ where
|
||||
) -> TransactionValidationOutcome<Self::Transaction> {
|
||||
let hash = *transaction.hash();
|
||||
let (tx, rx) = oneshot::channel();
|
||||
{
|
||||
let res = {
|
||||
let to_validation_task = self.to_validation_task.clone();
|
||||
let validator = self.validator.clone();
|
||||
let fut = Box::pin(async move {
|
||||
let res = validator.validate_transaction(origin, transaction).await;
|
||||
let _ = tx.send(res);
|
||||
});
|
||||
let to_validation_task = to_validation_task.lock().await;
|
||||
to_validation_task.send(fut).await
|
||||
};
|
||||
if res.is_err() {
|
||||
return TransactionValidationOutcome::Error(
|
||||
hash,
|
||||
Box::new(TransactionValidatorError::ValidationServiceUnreachable),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
match rx.await {
|
||||
Ok(res) => res,
|
||||
Err(_) => TransactionValidationOutcome::Error(
|
||||
self.metrics.inflight_validation_jobs.increment(1);
|
||||
if self.to_validation_task.send((origin, transaction, tx)).is_err() {
|
||||
return TransactionValidationOutcome::Error(
|
||||
hash,
|
||||
Box::new(TransactionValidatorError::ValidationServiceUnreachable),
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
// Wait for the result
|
||||
rx.await.unwrap_or_else(|_| {
|
||||
TransactionValidationOutcome::Error(
|
||||
hash,
|
||||
Box::new(TransactionValidatorError::ValidationServiceUnreachable),
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
async fn validate_transactions(
|
||||
&self,
|
||||
transactions: Vec<(TransactionOrigin, Self::Transaction)>,
|
||||
) -> Vec<TransactionValidationOutcome<Self::Transaction>> {
|
||||
let hashes: Vec<_> = transactions.iter().map(|(_, tx)| *tx.hash()).collect();
|
||||
let (tx, rx) = oneshot::channel();
|
||||
{
|
||||
let res = {
|
||||
let to_validation_task = self.to_validation_task.clone();
|
||||
let validator = self.validator.clone();
|
||||
let fut = Box::pin(async move {
|
||||
let res = validator.validate_transactions(transactions).await;
|
||||
let _ = tx.send(res);
|
||||
});
|
||||
let to_validation_task = to_validation_task.lock().await;
|
||||
to_validation_task.send(fut).await
|
||||
};
|
||||
if res.is_err() {
|
||||
let len = transactions.len();
|
||||
if len == 0 {
|
||||
return Vec::new();
|
||||
}
|
||||
|
||||
// Create oneshot channels for all transactions
|
||||
let mut receivers = Vec::with_capacity(len);
|
||||
let mut hashes = Vec::with_capacity(len);
|
||||
|
||||
for (origin, transaction) in transactions {
|
||||
let hash = *transaction.hash();
|
||||
hashes.push(hash);
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
receivers.push((hash, rx));
|
||||
|
||||
self.metrics.inflight_validation_jobs.increment(1);
|
||||
if self.to_validation_task.send((origin, transaction, tx)).is_err() {
|
||||
return hashes
|
||||
.into_iter()
|
||||
.map(|hash| {
|
||||
.map(|h| {
|
||||
TransactionValidationOutcome::Error(
|
||||
hash,
|
||||
h,
|
||||
Box::new(TransactionValidatorError::ValidationServiceUnreachable),
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
}
|
||||
}
|
||||
match rx.await {
|
||||
Ok(res) => res,
|
||||
Err(_) => hashes
|
||||
.into_iter()
|
||||
.map(|hash| {
|
||||
TransactionValidationOutcome::Error(
|
||||
hash,
|
||||
Box::new(TransactionValidatorError::ValidationServiceUnreachable),
|
||||
)
|
||||
})
|
||||
.collect(),
|
||||
|
||||
// Collect all results
|
||||
let mut results = Vec::with_capacity(len);
|
||||
for (hash, rx) in receivers {
|
||||
let result = match rx.await {
|
||||
Ok(res) => res,
|
||||
Err(_) => TransactionValidationOutcome::Error(
|
||||
hash,
|
||||
Box::new(TransactionValidatorError::ValidationServiceUnreachable),
|
||||
),
|
||||
};
|
||||
results.push(result);
|
||||
}
|
||||
|
||||
results
|
||||
}
|
||||
|
||||
async fn validate_transactions_with_origin(
|
||||
@@ -292,6 +285,58 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// A builder for [`TransactionValidationTaskExecutor`].
|
||||
///
|
||||
/// This builder holds a validator and configuration for spawning validation tasks.
|
||||
/// Use [`Self::map`] to wrap the validator before spawning tasks.
|
||||
#[derive(Debug)]
|
||||
pub struct TransactionValidationTaskExecutorBuilder<V> {
|
||||
/// The validator to use for transaction validation.
|
||||
validator: V,
|
||||
/// Number of additional validation tasks to spawn.
|
||||
additional_tasks: usize,
|
||||
}
|
||||
|
||||
impl<V> TransactionValidationTaskExecutorBuilder<V> {
|
||||
/// Creates a new builder with the given validator and number of additional tasks.
|
||||
pub const fn new(validator: V, additional_tasks: usize) -> Self {
|
||||
Self { validator, additional_tasks }
|
||||
}
|
||||
|
||||
/// Maps the given validator to a new type.
|
||||
pub fn map<F, T>(self, f: F) -> TransactionValidationTaskExecutorBuilder<T>
|
||||
where
|
||||
F: FnOnce(V) -> T,
|
||||
{
|
||||
TransactionValidationTaskExecutorBuilder {
|
||||
validator: f(self.validator),
|
||||
additional_tasks: self.additional_tasks,
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawns validation tasks and returns the executor.
|
||||
pub fn build_and_spawn<S>(self, spawner: S) -> TransactionValidationTaskExecutor<V>
|
||||
where
|
||||
V: TransactionValidator + 'static,
|
||||
S: TaskSpawner,
|
||||
{
|
||||
let Self { validator, additional_tasks } = self;
|
||||
let validator = Arc::new(validator);
|
||||
let metrics = TxPoolValidatorMetrics::default();
|
||||
let (to_validation_task, task) = ValidationTask::new(validator.clone(), metrics.clone());
|
||||
|
||||
// Spawn additional validation tasks
|
||||
for _ in 0..additional_tasks {
|
||||
spawner.spawn_blocking(Box::pin(task.clone().run()));
|
||||
}
|
||||
|
||||
// Spawn the critical validation task
|
||||
spawner.spawn_critical_blocking("transaction-validation-service", Box::pin(task.run()));
|
||||
|
||||
TransactionValidationTaskExecutor { validator, to_validation_task, metrics }
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
Reference in New Issue
Block a user