|
|
|
|
@@ -4,142 +4,142 @@ use crate::{
|
|
|
|
|
blobstore::BlobStore,
|
|
|
|
|
metrics::TxPoolValidatorMetrics,
|
|
|
|
|
validate::{EthTransactionValidatorBuilder, TransactionValidatorError},
|
|
|
|
|
EthTransactionValidator, PoolTransaction, TransactionOrigin, TransactionValidationOutcome,
|
|
|
|
|
TransactionValidator,
|
|
|
|
|
EthPoolTransaction, EthPooledTransaction, EthTransactionValidator, PoolTransaction,
|
|
|
|
|
TransactionOrigin, TransactionValidationOutcome, TransactionValidator,
|
|
|
|
|
};
|
|
|
|
|
use futures_util::{lock::Mutex, StreamExt};
|
|
|
|
|
use futures_util::lock::Mutex;
|
|
|
|
|
use reth_chainspec::{ChainSpecProvider, EthereumHardforks};
|
|
|
|
|
use reth_primitives_traits::{Block, SealedBlock};
|
|
|
|
|
use reth_storage_api::{noop::NoopProvider, StateProviderFactory};
|
|
|
|
|
use reth_tasks::TaskSpawner;
|
|
|
|
|
use std::{future::Future, pin::Pin, sync::Arc};
|
|
|
|
|
use tokio::{
|
|
|
|
|
sync,
|
|
|
|
|
sync::{mpsc, oneshot},
|
|
|
|
|
};
|
|
|
|
|
use tokio_stream::wrappers::ReceiverStream;
|
|
|
|
|
use std::sync::Arc;
|
|
|
|
|
use tokio::sync::{mpsc, oneshot};
|
|
|
|
|
|
|
|
|
|
/// Represents a future outputting unit type and is sendable.
|
|
|
|
|
type ValidationFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
|
|
|
|
|
|
|
|
|
|
/// Represents a stream of validation futures.
|
|
|
|
|
type ValidationStream = ReceiverStream<ValidationFuture>;
|
|
|
|
|
|
|
|
|
|
/// A service that performs validation jobs.
|
|
|
|
|
/// A validation job for a single transaction.
|
|
|
|
|
///
|
|
|
|
|
/// This listens for incoming validation jobs and executes them.
|
|
|
|
|
/// Contains the transaction to validate, its origin, and a channel to send the result back.
|
|
|
|
|
type ValidationJob<T> = (TransactionOrigin, T, oneshot::Sender<TransactionValidationOutcome<T>>);
|
|
|
|
|
|
|
|
|
|
/// A service that performs transaction validation jobs.
|
|
|
|
|
///
|
|
|
|
|
/// This listens for incoming transactions, batches them using `recv_many`,
|
|
|
|
|
/// and validates them in batches for improved performance.
|
|
|
|
|
///
|
|
|
|
|
/// This should be spawned as a task: [`ValidationTask::run`]
|
|
|
|
|
#[derive(Clone)]
|
|
|
|
|
pub struct ValidationTask {
|
|
|
|
|
validation_jobs: Arc<Mutex<ValidationStream>>,
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
|
pub struct ValidationTask<V: TransactionValidator> {
|
|
|
|
|
/// Shared receiver for validation jobs - multiple tasks can compete for work.
|
|
|
|
|
validation_jobs: Arc<Mutex<mpsc::UnboundedReceiver<ValidationJob<V::Transaction>>>>,
|
|
|
|
|
/// The validator used to validate transactions.
|
|
|
|
|
validator: Arc<V>,
|
|
|
|
|
/// Metrics for the validation task.
|
|
|
|
|
metrics: TxPoolValidatorMetrics,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl ValidationTask {
|
|
|
|
|
/// Creates a new cloneable task pair.
|
|
|
|
|
///
|
|
|
|
|
/// The sender sends new (transaction) validation tasks to an available validation task.
|
|
|
|
|
pub fn new() -> (ValidationJobSender, Self) {
|
|
|
|
|
Self::with_capacity(1)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Creates a new cloneable task pair with the given channel capacity.
|
|
|
|
|
pub fn with_capacity(capacity: usize) -> (ValidationJobSender, Self) {
|
|
|
|
|
let (tx, rx) = mpsc::channel(capacity);
|
|
|
|
|
let metrics = TxPoolValidatorMetrics::default();
|
|
|
|
|
(ValidationJobSender { tx, metrics }, Self::with_receiver(rx))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Creates a new task with the given receiver.
|
|
|
|
|
pub fn with_receiver(jobs: mpsc::Receiver<Pin<Box<dyn Future<Output = ()> + Send>>>) -> Self {
|
|
|
|
|
Self { validation_jobs: Arc::new(Mutex::new(ReceiverStream::new(jobs))) }
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Executes all new validation jobs that come in.
|
|
|
|
|
///
|
|
|
|
|
/// This will run as long as the channel is alive and is expected to be spawned as a task.
|
|
|
|
|
pub async fn run(self) {
|
|
|
|
|
while let Some(task) = self.validation_jobs.lock().await.next().await {
|
|
|
|
|
task.await;
|
|
|
|
|
impl<V: TransactionValidator> Clone for ValidationTask<V> {
|
|
|
|
|
fn clone(&self) -> Self {
|
|
|
|
|
Self {
|
|
|
|
|
validator: self.validator.clone(),
|
|
|
|
|
validation_jobs: self.validation_jobs.clone(),
|
|
|
|
|
metrics: self.metrics.clone(),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl std::fmt::Debug for ValidationTask {
|
|
|
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
|
|
|
f.debug_struct("ValidationTask").field("validation_jobs", &"...").finish()
|
|
|
|
|
impl<V: TransactionValidator> ValidationTask<V> {
|
|
|
|
|
/// Maximum number of transactions to batch in a single validation call.
|
|
|
|
|
const BATCH_SIZE: usize = 64;
|
|
|
|
|
|
|
|
|
|
/// Creates a new cloneable task pair.
|
|
|
|
|
///
|
|
|
|
|
/// The sender sends new (transaction) validation tasks to an available validation task.
|
|
|
|
|
pub fn new(
|
|
|
|
|
validator: Arc<V>,
|
|
|
|
|
metrics: TxPoolValidatorMetrics,
|
|
|
|
|
) -> (mpsc::UnboundedSender<ValidationJob<V::Transaction>>, Self) {
|
|
|
|
|
let (tx, rx) = mpsc::unbounded_channel();
|
|
|
|
|
(tx, Self { validator, validation_jobs: Arc::new(Mutex::new(rx)), metrics })
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// A sender new type for sending validation jobs to [`ValidationTask`].
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
|
pub struct ValidationJobSender {
|
|
|
|
|
tx: mpsc::Sender<Pin<Box<dyn Future<Output = ()> + Send>>>,
|
|
|
|
|
metrics: TxPoolValidatorMetrics,
|
|
|
|
|
}
|
|
|
|
|
/// Executes validation jobs, batching transactions for efficiency.
|
|
|
|
|
///
|
|
|
|
|
/// This will run as long as the channel is alive and is expected to be spawned as a task.
|
|
|
|
|
pub async fn run(self) {
|
|
|
|
|
let mut buffer = Vec::with_capacity(Self::BATCH_SIZE);
|
|
|
|
|
|
|
|
|
|
impl ValidationJobSender {
|
|
|
|
|
/// Sends the given job to the validation task.
|
|
|
|
|
pub async fn send(
|
|
|
|
|
&self,
|
|
|
|
|
job: Pin<Box<dyn Future<Output = ()> + Send>>,
|
|
|
|
|
) -> Result<(), TransactionValidatorError> {
|
|
|
|
|
self.metrics.inflight_validation_jobs.increment(1);
|
|
|
|
|
let res = self
|
|
|
|
|
.tx
|
|
|
|
|
.send(job)
|
|
|
|
|
.await
|
|
|
|
|
.map_err(|_| TransactionValidatorError::ValidationServiceUnreachable);
|
|
|
|
|
self.metrics.inflight_validation_jobs.decrement(1);
|
|
|
|
|
res
|
|
|
|
|
loop {
|
|
|
|
|
// Lock the receiver and batch receive transactions
|
|
|
|
|
let count =
|
|
|
|
|
self.validation_jobs.lock().await.recv_many(&mut buffer, Self::BATCH_SIZE).await;
|
|
|
|
|
|
|
|
|
|
if count == 0 {
|
|
|
|
|
// Channel closed, exit
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
self.metrics.inflight_validation_jobs.decrement(count as f64);
|
|
|
|
|
|
|
|
|
|
// Split into transactions and response senders
|
|
|
|
|
#[expect(clippy::iter_with_drain)]
|
|
|
|
|
let (txs, senders): (Vec<_>, Vec<_>) =
|
|
|
|
|
buffer.drain(..).map(|(origin, tx, sender)| ((origin, tx), sender)).unzip();
|
|
|
|
|
|
|
|
|
|
// Batch validate all transactions
|
|
|
|
|
let results = self.validator.validate_transactions(txs).await;
|
|
|
|
|
|
|
|
|
|
// Send results back through oneshot channels
|
|
|
|
|
for (result, sender) in results.into_iter().zip(senders) {
|
|
|
|
|
let _ = sender.send(result);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// A [`TransactionValidator`] implementation that validates ethereum transaction.
|
|
|
|
|
/// This validator is non-blocking, all validation work is done in a separate task.
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
|
pub struct TransactionValidationTaskExecutor<V> {
|
|
|
|
|
pub struct TransactionValidationTaskExecutor<V: TransactionValidator> {
|
|
|
|
|
/// The validator that will validate transactions on a separate task.
|
|
|
|
|
pub validator: Arc<V>,
|
|
|
|
|
/// The sender half to validation tasks that perform the actual validation.
|
|
|
|
|
pub to_validation_task: Arc<sync::Mutex<ValidationJobSender>>,
|
|
|
|
|
pub to_validation_task: mpsc::UnboundedSender<ValidationJob<V::Transaction>>,
|
|
|
|
|
/// Metrics for the validator task executor.
|
|
|
|
|
pub metrics: TxPoolValidatorMetrics,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<V> Clone for TransactionValidationTaskExecutor<V> {
|
|
|
|
|
impl<V: TransactionValidator> Clone for TransactionValidationTaskExecutor<V> {
|
|
|
|
|
fn clone(&self) -> Self {
|
|
|
|
|
Self {
|
|
|
|
|
validator: self.validator.clone(),
|
|
|
|
|
to_validation_task: self.to_validation_task.clone(),
|
|
|
|
|
metrics: self.metrics.clone(),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// === impl TransactionValidationTaskExecutor ===
|
|
|
|
|
|
|
|
|
|
impl TransactionValidationTaskExecutor<()> {
|
|
|
|
|
impl
|
|
|
|
|
TransactionValidationTaskExecutor<EthTransactionValidator<NoopProvider, EthPooledTransaction>>
|
|
|
|
|
{
|
|
|
|
|
/// Convenience method to create a [`EthTransactionValidatorBuilder`]
|
|
|
|
|
pub fn eth_builder<Client>(client: Client) -> EthTransactionValidatorBuilder<Client> {
|
|
|
|
|
EthTransactionValidatorBuilder::new(client)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<V> TransactionValidationTaskExecutor<V> {
|
|
|
|
|
/// Maps the given validator to a new type.
|
|
|
|
|
pub fn map<F, T>(self, mut f: F) -> TransactionValidationTaskExecutor<T>
|
|
|
|
|
where
|
|
|
|
|
F: FnMut(V) -> T,
|
|
|
|
|
{
|
|
|
|
|
TransactionValidationTaskExecutor {
|
|
|
|
|
validator: Arc::new(f(Arc::into_inner(self.validator).unwrap())),
|
|
|
|
|
to_validation_task: self.to_validation_task,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<V: TransactionValidator> TransactionValidationTaskExecutor<V> {
|
|
|
|
|
/// Returns the validator.
|
|
|
|
|
pub fn validator(&self) -> &V {
|
|
|
|
|
&self.validator
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<Client, Tx> TransactionValidationTaskExecutor<EthTransactionValidator<Client, Tx>> {
|
|
|
|
|
impl<Client, Tx> TransactionValidationTaskExecutor<EthTransactionValidator<Client, Tx>>
|
|
|
|
|
where
|
|
|
|
|
Client: ChainSpecProvider<ChainSpec: EthereumHardforks> + StateProviderFactory + 'static,
|
|
|
|
|
Tx: EthPoolTransaction,
|
|
|
|
|
{
|
|
|
|
|
/// Creates a new instance for the given client
|
|
|
|
|
///
|
|
|
|
|
/// This will spawn a single validation tasks that performs the actual validation.
|
|
|
|
|
@@ -175,20 +175,16 @@ impl<Client, Tx> TransactionValidationTaskExecutor<EthTransactionValidator<Clien
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<V> TransactionValidationTaskExecutor<V> {
|
|
|
|
|
impl<V: TransactionValidator> TransactionValidationTaskExecutor<V> {
|
|
|
|
|
/// Creates a new executor instance with the given validator for transaction validation.
|
|
|
|
|
///
|
|
|
|
|
/// Initializes the executor with the provided validator and sets up communication for
|
|
|
|
|
/// validation tasks.
|
|
|
|
|
pub fn new(validator: V) -> (Self, ValidationTask) {
|
|
|
|
|
let (tx, task) = ValidationTask::new();
|
|
|
|
|
(
|
|
|
|
|
Self {
|
|
|
|
|
validator: Arc::new(validator),
|
|
|
|
|
to_validation_task: Arc::new(sync::Mutex::new(tx)),
|
|
|
|
|
},
|
|
|
|
|
task,
|
|
|
|
|
)
|
|
|
|
|
pub fn new(validator: V) -> (Self, ValidationTask<V>) {
|
|
|
|
|
let validator = Arc::new(validator);
|
|
|
|
|
let metrics = TxPoolValidatorMetrics::default();
|
|
|
|
|
let (to_validation_task, task) = ValidationTask::new(validator.clone(), metrics.clone());
|
|
|
|
|
(Self { validator, to_validation_task, metrics }, task)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -196,7 +192,7 @@ impl<V> TransactionValidator for TransactionValidationTaskExecutor<V>
|
|
|
|
|
where
|
|
|
|
|
V: TransactionValidator + 'static,
|
|
|
|
|
{
|
|
|
|
|
type Transaction = <V as TransactionValidator>::Transaction;
|
|
|
|
|
type Transaction = V::Transaction;
|
|
|
|
|
|
|
|
|
|
async fn validate_transaction(
|
|
|
|
|
&self,
|
|
|
|
|
@@ -205,75 +201,72 @@ where
|
|
|
|
|
) -> TransactionValidationOutcome<Self::Transaction> {
|
|
|
|
|
let hash = *transaction.hash();
|
|
|
|
|
let (tx, rx) = oneshot::channel();
|
|
|
|
|
{
|
|
|
|
|
let res = {
|
|
|
|
|
let to_validation_task = self.to_validation_task.clone();
|
|
|
|
|
let validator = self.validator.clone();
|
|
|
|
|
let fut = Box::pin(async move {
|
|
|
|
|
let res = validator.validate_transaction(origin, transaction).await;
|
|
|
|
|
let _ = tx.send(res);
|
|
|
|
|
});
|
|
|
|
|
let to_validation_task = to_validation_task.lock().await;
|
|
|
|
|
to_validation_task.send(fut).await
|
|
|
|
|
};
|
|
|
|
|
if res.is_err() {
|
|
|
|
|
return TransactionValidationOutcome::Error(
|
|
|
|
|
hash,
|
|
|
|
|
Box::new(TransactionValidatorError::ValidationServiceUnreachable),
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
match rx.await {
|
|
|
|
|
Ok(res) => res,
|
|
|
|
|
Err(_) => TransactionValidationOutcome::Error(
|
|
|
|
|
self.metrics.inflight_validation_jobs.increment(1);
|
|
|
|
|
if self.to_validation_task.send((origin, transaction, tx)).is_err() {
|
|
|
|
|
return TransactionValidationOutcome::Error(
|
|
|
|
|
hash,
|
|
|
|
|
Box::new(TransactionValidatorError::ValidationServiceUnreachable),
|
|
|
|
|
),
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Wait for the result
|
|
|
|
|
rx.await.unwrap_or_else(|_| {
|
|
|
|
|
TransactionValidationOutcome::Error(
|
|
|
|
|
hash,
|
|
|
|
|
Box::new(TransactionValidatorError::ValidationServiceUnreachable),
|
|
|
|
|
)
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn validate_transactions(
|
|
|
|
|
&self,
|
|
|
|
|
transactions: Vec<(TransactionOrigin, Self::Transaction)>,
|
|
|
|
|
) -> Vec<TransactionValidationOutcome<Self::Transaction>> {
|
|
|
|
|
let hashes: Vec<_> = transactions.iter().map(|(_, tx)| *tx.hash()).collect();
|
|
|
|
|
let (tx, rx) = oneshot::channel();
|
|
|
|
|
{
|
|
|
|
|
let res = {
|
|
|
|
|
let to_validation_task = self.to_validation_task.clone();
|
|
|
|
|
let validator = self.validator.clone();
|
|
|
|
|
let fut = Box::pin(async move {
|
|
|
|
|
let res = validator.validate_transactions(transactions).await;
|
|
|
|
|
let _ = tx.send(res);
|
|
|
|
|
});
|
|
|
|
|
let to_validation_task = to_validation_task.lock().await;
|
|
|
|
|
to_validation_task.send(fut).await
|
|
|
|
|
};
|
|
|
|
|
if res.is_err() {
|
|
|
|
|
let len = transactions.len();
|
|
|
|
|
if len == 0 {
|
|
|
|
|
return Vec::new();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Create oneshot channels for all transactions
|
|
|
|
|
let mut receivers = Vec::with_capacity(len);
|
|
|
|
|
let mut hashes = Vec::with_capacity(len);
|
|
|
|
|
|
|
|
|
|
for (origin, transaction) in transactions {
|
|
|
|
|
let hash = *transaction.hash();
|
|
|
|
|
hashes.push(hash);
|
|
|
|
|
|
|
|
|
|
let (tx, rx) = oneshot::channel();
|
|
|
|
|
receivers.push((hash, rx));
|
|
|
|
|
|
|
|
|
|
self.metrics.inflight_validation_jobs.increment(1);
|
|
|
|
|
if self.to_validation_task.send((origin, transaction, tx)).is_err() {
|
|
|
|
|
return hashes
|
|
|
|
|
.into_iter()
|
|
|
|
|
.map(|hash| {
|
|
|
|
|
.map(|h| {
|
|
|
|
|
TransactionValidationOutcome::Error(
|
|
|
|
|
hash,
|
|
|
|
|
h,
|
|
|
|
|
Box::new(TransactionValidatorError::ValidationServiceUnreachable),
|
|
|
|
|
)
|
|
|
|
|
})
|
|
|
|
|
.collect();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
match rx.await {
|
|
|
|
|
Ok(res) => res,
|
|
|
|
|
Err(_) => hashes
|
|
|
|
|
.into_iter()
|
|
|
|
|
.map(|hash| {
|
|
|
|
|
TransactionValidationOutcome::Error(
|
|
|
|
|
hash,
|
|
|
|
|
Box::new(TransactionValidatorError::ValidationServiceUnreachable),
|
|
|
|
|
)
|
|
|
|
|
})
|
|
|
|
|
.collect(),
|
|
|
|
|
|
|
|
|
|
// Collect all results
|
|
|
|
|
let mut results = Vec::with_capacity(len);
|
|
|
|
|
for (hash, rx) in receivers {
|
|
|
|
|
let result = match rx.await {
|
|
|
|
|
Ok(res) => res,
|
|
|
|
|
Err(_) => TransactionValidationOutcome::Error(
|
|
|
|
|
hash,
|
|
|
|
|
Box::new(TransactionValidatorError::ValidationServiceUnreachable),
|
|
|
|
|
),
|
|
|
|
|
};
|
|
|
|
|
results.push(result);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
results
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn validate_transactions_with_origin(
|
|
|
|
|
@@ -292,6 +285,58 @@ where
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// A builder for [`TransactionValidationTaskExecutor`].
|
|
|
|
|
///
|
|
|
|
|
/// This builder holds a validator and configuration for spawning validation tasks.
|
|
|
|
|
/// Use [`Self::map`] to wrap the validator before spawning tasks.
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
|
pub struct TransactionValidationTaskExecutorBuilder<V> {
|
|
|
|
|
/// The validator to use for transaction validation.
|
|
|
|
|
validator: V,
|
|
|
|
|
/// Number of additional validation tasks to spawn.
|
|
|
|
|
additional_tasks: usize,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<V> TransactionValidationTaskExecutorBuilder<V> {
|
|
|
|
|
/// Creates a new builder with the given validator and number of additional tasks.
|
|
|
|
|
pub const fn new(validator: V, additional_tasks: usize) -> Self {
|
|
|
|
|
Self { validator, additional_tasks }
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Maps the given validator to a new type.
|
|
|
|
|
pub fn map<F, T>(self, f: F) -> TransactionValidationTaskExecutorBuilder<T>
|
|
|
|
|
where
|
|
|
|
|
F: FnOnce(V) -> T,
|
|
|
|
|
{
|
|
|
|
|
TransactionValidationTaskExecutorBuilder {
|
|
|
|
|
validator: f(self.validator),
|
|
|
|
|
additional_tasks: self.additional_tasks,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Spawns validation tasks and returns the executor.
|
|
|
|
|
pub fn build_and_spawn<S>(self, spawner: S) -> TransactionValidationTaskExecutor<V>
|
|
|
|
|
where
|
|
|
|
|
V: TransactionValidator + 'static,
|
|
|
|
|
S: TaskSpawner,
|
|
|
|
|
{
|
|
|
|
|
let Self { validator, additional_tasks } = self;
|
|
|
|
|
let validator = Arc::new(validator);
|
|
|
|
|
let metrics = TxPoolValidatorMetrics::default();
|
|
|
|
|
let (to_validation_task, task) = ValidationTask::new(validator.clone(), metrics.clone());
|
|
|
|
|
|
|
|
|
|
// Spawn additional validation tasks
|
|
|
|
|
for _ in 0..additional_tasks {
|
|
|
|
|
spawner.spawn_blocking(Box::pin(task.clone().run()));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Spawn the critical validation task
|
|
|
|
|
spawner.spawn_critical_blocking("transaction-validation-service", Box::pin(task.run()));
|
|
|
|
|
|
|
|
|
|
TransactionValidationTaskExecutor { validator, to_validation_task, metrics }
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
|
mod tests {
|
|
|
|
|
use super::*;
|
|
|
|
|
|