Compare commits

..

1 Commits

Author SHA1 Message Date
Matthias Seitz
5c04d1abe1 fix: allow smaller header size 2025-12-16 17:08:53 +01:00
6 changed files with 180 additions and 216 deletions

View File

@@ -1218,7 +1218,9 @@ impl ReverseHeadersDownloaderBuilder {
next_request_block_number: 0,
next_chain_tip_block_number: 0,
lowest_validated_header: None,
request_limit,
// TODO(mattsse): tmp hotfix to prevent issues with syncing from besu which has an upper
// limit of 512
request_limit: request_limit.min(512),
min_concurrent_requests,
max_concurrent_requests,
stream_batch_size,

View File

@@ -995,15 +995,14 @@ where
.additional_validation_tasks
.unwrap_or_else(|| ctx.config().txpool.additional_validation_tasks),
)
.into_tasks_builder(blob_store.clone())
.build_with_tasks(ctx.task_executor().clone(), blob_store.clone())
.map(|validator| {
OpTransactionValidator::new(validator)
// In --dev mode we can't require gas fees because we're unable to decode
// the L1 block info
.require_l1_data_gas_fee(!ctx.config().dev.dev)
.with_supervisor(supervisor_client.clone())
})
.build_and_spawn(ctx.task_executor().clone());
});
let final_pool_config = pool_config_overrides.apply(ctx.pool_config());

View File

@@ -36,6 +36,7 @@ futures-util.workspace = true
parking_lot.workspace = true
pin-project.workspace = true
tokio = { workspace = true, features = ["sync"] }
tokio-stream.workspace = true
# metrics
reth-metrics.workspace = true
@@ -73,7 +74,6 @@ tempfile.workspace = true
serde_json.workspace = true
tokio = { workspace = true, features = ["rt-multi-thread"] }
futures.workspace = true
tokio-stream.workspace = true
[features]
serde = [

View File

@@ -148,7 +148,7 @@ pub struct TxPoolValidationMetrics {
}
/// Transaction pool validator task metrics
#[derive(Metrics, Clone)]
#[derive(Metrics)]
#[metrics(scope = "transaction_pool")]
pub struct TxPoolValidatorMetrics {
/// Number of in-flight validation job sends waiting for channel capacity

View File

@@ -8,12 +8,10 @@ use crate::{
},
metrics::TxPoolValidationMetrics,
traits::TransactionOrigin,
validate::{
task::TransactionValidationTaskExecutorBuilder, ValidTransaction, MAX_INIT_CODE_BYTE_SIZE,
},
validate::{ValidTransaction, ValidationTask, MAX_INIT_CODE_BYTE_SIZE},
Address, BlobTransactionSidecarVariant, EthBlobTransactionSidecar, EthPoolTransaction,
LocalTransactionConfig, PoolTransaction, TransactionValidationOutcome,
TransactionValidationTaskExecutor, TransactionValidator,
LocalTransactionConfig, TransactionValidationOutcome, TransactionValidationTaskExecutor,
TransactionValidator,
};
use alloy_consensus::{
@@ -43,6 +41,7 @@ use std::{
},
time::{Instant, SystemTime},
};
use tokio::sync::Mutex;
/// A [`TransactionValidator`] implementation that validates ethereum transaction.
///
@@ -1165,19 +1164,6 @@ impl<Client> EthTransactionValidatorBuilder<Client> {
}
}
/// Creates a [`TransactionValidationTaskExecutorBuilder`].
pub fn into_tasks_builder<Tx, S>(
self,
blob_store: S,
) -> TransactionValidationTaskExecutorBuilder<EthTransactionValidator<Client, Tx>>
where
S: BlobStore,
{
let additional_tasks = self.additional_tasks;
let validator = self.build(blob_store);
TransactionValidationTaskExecutorBuilder::new(validator, additional_tasks)
}
/// Builds a [`EthTransactionValidator`] and spawns validation tasks via the
/// [`TransactionValidationTaskExecutor`]
///
@@ -1192,10 +1178,32 @@ impl<Client> EthTransactionValidatorBuilder<Client> {
where
T: TaskSpawner,
S: BlobStore,
Tx: PoolTransaction,
EthTransactionValidator<Client, Tx>: TransactionValidator<Transaction = Tx> + 'static,
{
self.into_tasks_builder(blob_store).build_and_spawn(tasks)
let additional_tasks = self.additional_tasks;
let validator = self.build(blob_store);
let (tx, task) = ValidationTask::new();
// Spawn validation tasks, they are blocking because they perform db lookups
for _ in 0..additional_tasks {
let task = task.clone();
tasks.spawn_blocking(Box::pin(async move {
task.run().await;
}));
}
// we spawn them on critical tasks because validation, especially for EIP-4844 can be quite
// heavy
tasks.spawn_critical_blocking(
"transaction-validation-service",
Box::pin(async move {
task.run().await;
}),
);
let to_validation_task = Arc::new(Mutex::new(tx));
TransactionValidationTaskExecutor { validator: Arc::new(validator), to_validation_task }
}
}

View File

@@ -4,142 +4,142 @@ use crate::{
blobstore::BlobStore,
metrics::TxPoolValidatorMetrics,
validate::{EthTransactionValidatorBuilder, TransactionValidatorError},
EthPoolTransaction, EthPooledTransaction, EthTransactionValidator, PoolTransaction,
TransactionOrigin, TransactionValidationOutcome, TransactionValidator,
EthTransactionValidator, PoolTransaction, TransactionOrigin, TransactionValidationOutcome,
TransactionValidator,
};
use futures_util::lock::Mutex;
use reth_chainspec::{ChainSpecProvider, EthereumHardforks};
use futures_util::{lock::Mutex, StreamExt};
use reth_primitives_traits::{Block, SealedBlock};
use reth_storage_api::{noop::NoopProvider, StateProviderFactory};
use reth_tasks::TaskSpawner;
use std::sync::Arc;
use tokio::sync::{mpsc, oneshot};
use std::{future::Future, pin::Pin, sync::Arc};
use tokio::{
sync,
sync::{mpsc, oneshot},
};
use tokio_stream::wrappers::ReceiverStream;
/// A validation job for a single transaction.
///
/// Contains the transaction to validate, its origin, and a channel to send the result back.
type ValidationJob<T> = (TransactionOrigin, T, oneshot::Sender<TransactionValidationOutcome<T>>);
/// Represents a future outputting unit type and is sendable.
type ValidationFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
/// A service that performs transaction validation jobs.
/// Represents a stream of validation futures.
type ValidationStream = ReceiverStream<ValidationFuture>;
/// A service that performs validation jobs.
///
/// This listens for incoming transactions, batches them using `recv_many`,
/// and validates them in batches for improved performance.
/// This listens for incoming validation jobs and executes them.
///
/// This should be spawned as a task: [`ValidationTask::run`]
#[derive(Debug)]
pub struct ValidationTask<V: TransactionValidator> {
/// Shared receiver for validation jobs - multiple tasks can compete for work.
validation_jobs: Arc<Mutex<mpsc::UnboundedReceiver<ValidationJob<V::Transaction>>>>,
/// The validator used to validate transactions.
validator: Arc<V>,
/// Metrics for the validation task.
metrics: TxPoolValidatorMetrics,
#[derive(Clone)]
pub struct ValidationTask {
validation_jobs: Arc<Mutex<ValidationStream>>,
}
impl<V: TransactionValidator> Clone for ValidationTask<V> {
fn clone(&self) -> Self {
Self {
validator: self.validator.clone(),
validation_jobs: self.validation_jobs.clone(),
metrics: self.metrics.clone(),
}
}
}
impl<V: TransactionValidator> ValidationTask<V> {
/// Maximum number of transactions to batch in a single validation call.
const BATCH_SIZE: usize = 64;
impl ValidationTask {
/// Creates a new cloneable task pair.
///
/// The sender sends new (transaction) validation tasks to an available validation task.
pub fn new(
validator: Arc<V>,
metrics: TxPoolValidatorMetrics,
) -> (mpsc::UnboundedSender<ValidationJob<V::Transaction>>, Self) {
let (tx, rx) = mpsc::unbounded_channel();
(tx, Self { validator, validation_jobs: Arc::new(Mutex::new(rx)), metrics })
pub fn new() -> (ValidationJobSender, Self) {
Self::with_capacity(1)
}
/// Executes validation jobs, batching transactions for efficiency.
/// Creates a new cloneable task pair with the given channel capacity.
pub fn with_capacity(capacity: usize) -> (ValidationJobSender, Self) {
let (tx, rx) = mpsc::channel(capacity);
let metrics = TxPoolValidatorMetrics::default();
(ValidationJobSender { tx, metrics }, Self::with_receiver(rx))
}
/// Creates a new task with the given receiver.
pub fn with_receiver(jobs: mpsc::Receiver<Pin<Box<dyn Future<Output = ()> + Send>>>) -> Self {
Self { validation_jobs: Arc::new(Mutex::new(ReceiverStream::new(jobs))) }
}
/// Executes all new validation jobs that come in.
///
/// This will run as long as the channel is alive and is expected to be spawned as a task.
pub async fn run(self) {
let mut buffer = Vec::with_capacity(Self::BATCH_SIZE);
loop {
// Lock the receiver and batch receive transactions
let count =
self.validation_jobs.lock().await.recv_many(&mut buffer, Self::BATCH_SIZE).await;
if count == 0 {
// Channel closed, exit
break;
}
self.metrics.inflight_validation_jobs.decrement(count as f64);
// Split into transactions and response senders
#[expect(clippy::iter_with_drain)]
let (txs, senders): (Vec<_>, Vec<_>) =
buffer.drain(..).map(|(origin, tx, sender)| ((origin, tx), sender)).unzip();
// Batch validate all transactions
let results = self.validator.validate_transactions(txs).await;
// Send results back through oneshot channels
for (result, sender) in results.into_iter().zip(senders) {
let _ = sender.send(result);
}
while let Some(task) = self.validation_jobs.lock().await.next().await {
task.await;
}
}
}
impl std::fmt::Debug for ValidationTask {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ValidationTask").field("validation_jobs", &"...").finish()
}
}
/// A sender new type for sending validation jobs to [`ValidationTask`].
#[derive(Debug)]
pub struct ValidationJobSender {
tx: mpsc::Sender<Pin<Box<dyn Future<Output = ()> + Send>>>,
metrics: TxPoolValidatorMetrics,
}
impl ValidationJobSender {
/// Sends the given job to the validation task.
pub async fn send(
&self,
job: Pin<Box<dyn Future<Output = ()> + Send>>,
) -> Result<(), TransactionValidatorError> {
self.metrics.inflight_validation_jobs.increment(1);
let res = self
.tx
.send(job)
.await
.map_err(|_| TransactionValidatorError::ValidationServiceUnreachable);
self.metrics.inflight_validation_jobs.decrement(1);
res
}
}
/// A [`TransactionValidator`] implementation that validates ethereum transaction.
/// This validator is non-blocking, all validation work is done in a separate task.
#[derive(Debug)]
pub struct TransactionValidationTaskExecutor<V: TransactionValidator> {
pub struct TransactionValidationTaskExecutor<V> {
/// The validator that will validate transactions on a separate task.
pub validator: Arc<V>,
/// The sender half to validation tasks that perform the actual validation.
pub to_validation_task: mpsc::UnboundedSender<ValidationJob<V::Transaction>>,
/// Metrics for the validator task executor.
pub metrics: TxPoolValidatorMetrics,
pub to_validation_task: Arc<sync::Mutex<ValidationJobSender>>,
}
impl<V: TransactionValidator> Clone for TransactionValidationTaskExecutor<V> {
impl<V> Clone for TransactionValidationTaskExecutor<V> {
fn clone(&self) -> Self {
Self {
validator: self.validator.clone(),
to_validation_task: self.to_validation_task.clone(),
metrics: self.metrics.clone(),
}
}
}
// === impl TransactionValidationTaskExecutor ===
impl
TransactionValidationTaskExecutor<EthTransactionValidator<NoopProvider, EthPooledTransaction>>
{
impl TransactionValidationTaskExecutor<()> {
/// Convenience method to create a [`EthTransactionValidatorBuilder`]
pub fn eth_builder<Client>(client: Client) -> EthTransactionValidatorBuilder<Client> {
EthTransactionValidatorBuilder::new(client)
}
}
impl<V: TransactionValidator> TransactionValidationTaskExecutor<V> {
impl<V> TransactionValidationTaskExecutor<V> {
/// Maps the given validator to a new type.
pub fn map<F, T>(self, mut f: F) -> TransactionValidationTaskExecutor<T>
where
F: FnMut(V) -> T,
{
TransactionValidationTaskExecutor {
validator: Arc::new(f(Arc::into_inner(self.validator).unwrap())),
to_validation_task: self.to_validation_task,
}
}
/// Returns the validator.
pub fn validator(&self) -> &V {
&self.validator
}
}
impl<Client, Tx> TransactionValidationTaskExecutor<EthTransactionValidator<Client, Tx>>
where
Client: ChainSpecProvider<ChainSpec: EthereumHardforks> + StateProviderFactory + 'static,
Tx: EthPoolTransaction,
{
impl<Client, Tx> TransactionValidationTaskExecutor<EthTransactionValidator<Client, Tx>> {
/// Creates a new instance for the given client
///
/// This will spawn a single validation tasks that performs the actual validation.
@@ -175,16 +175,20 @@ where
}
}
impl<V: TransactionValidator> TransactionValidationTaskExecutor<V> {
impl<V> TransactionValidationTaskExecutor<V> {
/// Creates a new executor instance with the given validator for transaction validation.
///
/// Initializes the executor with the provided validator and sets up communication for
/// validation tasks.
pub fn new(validator: V) -> (Self, ValidationTask<V>) {
let validator = Arc::new(validator);
let metrics = TxPoolValidatorMetrics::default();
let (to_validation_task, task) = ValidationTask::new(validator.clone(), metrics.clone());
(Self { validator, to_validation_task, metrics }, task)
pub fn new(validator: V) -> (Self, ValidationTask) {
let (tx, task) = ValidationTask::new();
(
Self {
validator: Arc::new(validator),
to_validation_task: Arc::new(sync::Mutex::new(tx)),
},
task,
)
}
}
@@ -192,7 +196,7 @@ impl<V> TransactionValidator for TransactionValidationTaskExecutor<V>
where
V: TransactionValidator + 'static,
{
type Transaction = V::Transaction;
type Transaction = <V as TransactionValidator>::Transaction;
async fn validate_transaction(
&self,
@@ -201,72 +205,75 @@ where
) -> TransactionValidationOutcome<Self::Transaction> {
let hash = *transaction.hash();
let (tx, rx) = oneshot::channel();
self.metrics.inflight_validation_jobs.increment(1);
if self.to_validation_task.send((origin, transaction, tx)).is_err() {
return TransactionValidationOutcome::Error(
hash,
Box::new(TransactionValidatorError::ValidationServiceUnreachable),
);
{
let res = {
let to_validation_task = self.to_validation_task.clone();
let validator = self.validator.clone();
let fut = Box::pin(async move {
let res = validator.validate_transaction(origin, transaction).await;
let _ = tx.send(res);
});
let to_validation_task = to_validation_task.lock().await;
to_validation_task.send(fut).await
};
if res.is_err() {
return TransactionValidationOutcome::Error(
hash,
Box::new(TransactionValidatorError::ValidationServiceUnreachable),
);
}
}
// Wait for the result
rx.await.unwrap_or_else(|_| {
TransactionValidationOutcome::Error(
match rx.await {
Ok(res) => res,
Err(_) => TransactionValidationOutcome::Error(
hash,
Box::new(TransactionValidatorError::ValidationServiceUnreachable),
)
})
),
}
}
async fn validate_transactions(
&self,
transactions: Vec<(TransactionOrigin, Self::Transaction)>,
) -> Vec<TransactionValidationOutcome<Self::Transaction>> {
let len = transactions.len();
if len == 0 {
return Vec::new();
}
// Create oneshot channels for all transactions
let mut receivers = Vec::with_capacity(len);
let mut hashes = Vec::with_capacity(len);
for (origin, transaction) in transactions {
let hash = *transaction.hash();
hashes.push(hash);
let (tx, rx) = oneshot::channel();
receivers.push((hash, rx));
self.metrics.inflight_validation_jobs.increment(1);
if self.to_validation_task.send((origin, transaction, tx)).is_err() {
let hashes: Vec<_> = transactions.iter().map(|(_, tx)| *tx.hash()).collect();
let (tx, rx) = oneshot::channel();
{
let res = {
let to_validation_task = self.to_validation_task.clone();
let validator = self.validator.clone();
let fut = Box::pin(async move {
let res = validator.validate_transactions(transactions).await;
let _ = tx.send(res);
});
let to_validation_task = to_validation_task.lock().await;
to_validation_task.send(fut).await
};
if res.is_err() {
return hashes
.into_iter()
.map(|h| {
.map(|hash| {
TransactionValidationOutcome::Error(
h,
hash,
Box::new(TransactionValidatorError::ValidationServiceUnreachable),
)
})
.collect();
}
}
// Collect all results
let mut results = Vec::with_capacity(len);
for (hash, rx) in receivers {
let result = match rx.await {
Ok(res) => res,
Err(_) => TransactionValidationOutcome::Error(
hash,
Box::new(TransactionValidatorError::ValidationServiceUnreachable),
),
};
results.push(result);
match rx.await {
Ok(res) => res,
Err(_) => hashes
.into_iter()
.map(|hash| {
TransactionValidationOutcome::Error(
hash,
Box::new(TransactionValidatorError::ValidationServiceUnreachable),
)
})
.collect(),
}
results
}
async fn validate_transactions_with_origin(
@@ -285,58 +292,6 @@ where
}
}
/// A builder for [`TransactionValidationTaskExecutor`].
///
/// This builder holds a validator and configuration for spawning validation tasks.
/// Use [`Self::map`] to wrap the validator before spawning tasks.
#[derive(Debug)]
pub struct TransactionValidationTaskExecutorBuilder<V> {
/// The validator to use for transaction validation.
validator: V,
/// Number of additional validation tasks to spawn.
additional_tasks: usize,
}
impl<V> TransactionValidationTaskExecutorBuilder<V> {
/// Creates a new builder with the given validator and number of additional tasks.
pub const fn new(validator: V, additional_tasks: usize) -> Self {
Self { validator, additional_tasks }
}
/// Maps the given validator to a new type.
pub fn map<F, T>(self, f: F) -> TransactionValidationTaskExecutorBuilder<T>
where
F: FnOnce(V) -> T,
{
TransactionValidationTaskExecutorBuilder {
validator: f(self.validator),
additional_tasks: self.additional_tasks,
}
}
/// Spawns validation tasks and returns the executor.
pub fn build_and_spawn<S>(self, spawner: S) -> TransactionValidationTaskExecutor<V>
where
V: TransactionValidator + 'static,
S: TaskSpawner,
{
let Self { validator, additional_tasks } = self;
let validator = Arc::new(validator);
let metrics = TxPoolValidatorMetrics::default();
let (to_validation_task, task) = ValidationTask::new(validator.clone(), metrics.clone());
// Spawn additional validation tasks
for _ in 0..additional_tasks {
spawner.spawn_blocking(Box::pin(task.clone().run()));
}
// Spawn the critical validation task
spawner.spawn_critical_blocking("transaction-validation-service", Box::pin(task.run()));
TransactionValidationTaskExecutor { validator, to_validation_task, metrics }
}
}
#[cfg(test)]
mod tests {
use super::*;