refactor(txns): inline validation logic and remove validation.rs (#16668)

Signed-off-by: 7suyash7 <suyashnyn1@gmail.com>
This commit is contained in:
Suyash Nayan
2025-06-05 13:28:44 +05:30
committed by GitHub
parent 1efc666a13
commit 73b4073363
4 changed files with 15 additions and 184 deletions

View File

@@ -168,7 +168,6 @@ pub use manager::NetworkManager;
pub use metrics::TxTypesCounter;
pub use network::{NetworkHandle, NetworkProtocols};
pub use swarm::NetworkConnectionState;
pub use transactions::MessageFilter;
/// re-export p2p interfaces
pub use reth_network_p2p as p2p;

View File

@@ -28,14 +28,12 @@
use super::{
config::TransactionFetcherConfig,
constants::{tx_fetcher::*, SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST},
MessageFilter, PeerMetadata, PooledTransactions,
SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
PeerMetadata, PooledTransactions, SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
};
use crate::{
cache::{LruCache, LruMap},
duration_metered_exec,
metrics::TransactionFetcherMetrics,
transactions::{validation, PartiallyFilterMessage},
};
use alloy_consensus::transaction::PooledTransaction;
use alloy_primitives::TxHash;
@@ -60,7 +58,6 @@ use std::{
};
use tokio::sync::{mpsc::error::TrySendError, oneshot, oneshot::error::RecvError};
use tracing::trace;
use validation::FilterOutcome;
/// The type responsible for fetching missing transactions from peers.
///
@@ -85,8 +82,6 @@ pub struct TransactionFetcher<N: NetworkPrimitives = EthNetworkPrimitives> {
pub hashes_pending_fetch: LruCache<TxHash>,
/// Tracks all hashes in the transaction fetcher.
pub hashes_fetch_inflight_and_pending_fetch: LruMap<TxHash, TxFetchMetadata, ByLength>,
/// Filter for valid announcement and response data.
pub(super) filter_valid_message: MessageFilter,
/// Info on capacity of the transaction fetcher.
pub info: TransactionFetcherInfo,
#[doc(hidden)]
@@ -919,20 +914,19 @@ impl<N: NetworkPrimitives> TransactionFetcher<N> {
//
let unvalidated_payload_len = verified_payload.len();
let (validation_outcome, valid_payload) =
self.filter_valid_message.partially_filter_valid_entries(verified_payload);
let valid_payload = verified_payload.dedup();
// todo: validate based on announced tx size/type and report peer for sending
// invalid response <https://github.com/paradigmxyz/reth/issues/6529>. requires
// passing the rlp encoded length down from active session along with the decoded
// tx.
if validation_outcome == FilterOutcome::ReportPeer {
if valid_payload.len() != unvalidated_payload_len {
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
unvalidated_payload_len,
valid_payload_len=valid_payload.len(),
"received invalid `PooledTransactions` response from peer, filtered out duplicate entries"
peer_id=format!("{peer_id:#}"),
unvalidated_payload_len,
valid_payload_len=valid_payload.len(),
"received `PooledTransactions` response from peer with duplicate entries, filtered them out"
);
}
// valid payload will have at least one transaction at this point. even if the tx
@@ -1014,7 +1008,6 @@ impl<T: NetworkPrimitives> Default for TransactionFetcher<T> {
hashes_fetch_inflight_and_pending_fetch: LruMap::new(
DEFAULT_MAX_CAPACITY_CACHE_INFLIGHT_AND_PENDING_FETCH,
),
filter_valid_message: Default::default(),
info: TransactionFetcherInfo::default(),
metrics: Default::default(),
}

View File

@@ -8,7 +8,6 @@ pub mod constants;
pub mod fetcher;
/// Defines the [`TransactionPolicies`] trait for aggregating transaction-related policies.
pub mod policy;
pub mod validation;
pub use self::constants::{
tx_fetcher::DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
@@ -20,7 +19,6 @@ pub use config::{
TransactionPropagationPolicy, TransactionsManagerConfig,
};
use policy::{NetworkPolicies, TransactionPolicies};
pub use validation::*;
pub(crate) use fetcher::{FetchEvent, TransactionFetcher};
@@ -596,10 +594,15 @@ impl<Pool: TransactionPool, N: NetworkPrimitives, PBundle: TransactionPolicies>
}
// 1. filter out spam
let (validation_outcome, mut partially_valid_msg) =
self.transaction_fetcher.filter_valid_message.partially_filter_valid_entries(msg);
if msg.is_empty() {
self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
return;
}
if validation_outcome == FilterOutcome::ReportPeer {
let original_len = msg.len();
let mut partially_valid_msg = msg.dedup();
if partially_valid_msg.len() != original_len {
self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
}

View File

@@ -1,164 +0,0 @@
//! Validation of [`NewPooledTransactionHashes66`](reth_eth_wire::NewPooledTransactionHashes66)
//! and [`NewPooledTransactionHashes68`](reth_eth_wire::NewPooledTransactionHashes68)
//! announcements. Validation and filtering of announcements is network dependent.
use alloy_primitives::Signature;
use derive_more::{Deref, DerefMut};
use reth_eth_wire::{DedupPayload, HandleMempoolData, PartiallyValidData};
use std::{fmt, fmt::Display, mem};
use tracing::trace;
/// The size of a decoded signature in bytes.
pub const SIGNATURE_DECODED_SIZE_BYTES: usize = mem::size_of::<Signature>();
/// Outcomes from validating a `(ty, hash, size)` entry from a
/// [`NewPooledTransactionHashes68`](reth_eth_wire::NewPooledTransactionHashes68). Signals to the
/// caller how to deal with an announcement entry and the peer who sent the announcement.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ValidationOutcome {
/// Tells the caller to keep the entry in the announcement for fetch.
Fetch,
/// Tells the caller to filter out the entry from the announcement.
Ignore,
/// Tells the caller to filter out the entry from the announcement and penalize the peer. On
/// this outcome, caller can drop the announcement, that is up to each implementation.
ReportPeer,
}
/// Generic filter for announcements and responses. Checks for empty message and unique hashes/
/// transactions in message.
pub trait PartiallyFilterMessage {
/// Removes duplicate entries from a mempool message. Returns [`FilterOutcome::ReportPeer`] if
/// the caller should penalize the peer, otherwise [`FilterOutcome::Ok`].
fn partially_filter_valid_entries<V>(
&self,
msg: impl DedupPayload<Value = V> + fmt::Debug,
) -> (FilterOutcome, PartiallyValidData<V>) {
// 1. checks if the announcement is empty
if msg.is_empty() {
trace!(target: "net::tx",
msg=?msg,
"empty payload"
);
return (FilterOutcome::ReportPeer, PartiallyValidData::empty_eth66())
}
// 2. checks if announcement is spam packed with duplicate hashes
let original_len = msg.len();
let partially_valid_data = msg.dedup();
(
if partially_valid_data.len() == original_len {
FilterOutcome::Ok
} else {
FilterOutcome::ReportPeer
},
partially_valid_data,
)
}
}
/// Outcome from filtering
/// [`NewPooledTransactionHashes68`](reth_eth_wire::NewPooledTransactionHashes68). Signals to caller
/// whether to penalize the sender of the announcement or not.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FilterOutcome {
/// Peer behaves appropriately.
Ok,
/// A penalty should be flagged for the peer. Peer sent an announcement with unacceptably
/// invalid entries.
ReportPeer,
}
/// A generic wrapper for types that provide message filtering capabilities.
///
/// This struct is typically used with types implementing traits like [`PartiallyFilterMessage`],
/// which perform initial stateless validation on network messages, such as checking for empty
/// payloads or removing duplicate entries.
#[derive(Debug, Default, Deref, DerefMut)]
pub struct MessageFilter<N = EthMessageFilter>(N);
/// Filter for announcements containing EIP [`reth_ethereum_primitives::TxType`]s.
#[derive(Debug, Default)]
pub struct EthMessageFilter;
impl Display for EthMessageFilter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "EthMessageFilter")
}
}
impl PartiallyFilterMessage for EthMessageFilter {}
#[cfg(test)]
mod test {
use super::*;
use alloy_primitives::B256;
use reth_eth_wire::{NewPooledTransactionHashes66, NewPooledTransactionHashes68};
use std::{collections::HashMap, str::FromStr};
#[test]
fn eth68_empty_announcement() {
let types = vec![];
let sizes = vec![];
let hashes = vec![];
let announcement = NewPooledTransactionHashes68 { types, sizes, hashes };
let filter = EthMessageFilter;
let (outcome, _partially_valid_data) = filter.partially_filter_valid_entries(announcement);
assert_eq!(outcome, FilterOutcome::ReportPeer);
}
#[test]
fn eth66_empty_announcement() {
let hashes = vec![];
let announcement = NewPooledTransactionHashes66(hashes);
let filter: MessageFilter = MessageFilter::default();
let (outcome, _partially_valid_data) = filter.partially_filter_valid_entries(announcement);
assert_eq!(outcome, FilterOutcome::ReportPeer);
}
#[test]
fn eth66_announcement_duplicate_tx_hash() {
// first three or the same
let hashes = vec![
B256::from_str("0xbeefcafebeefcafebeefcafebeefcafebeefcafebeefcafebeefcafebeefbbbb") // dup1
.unwrap(),
B256::from_str("0xbeefcafebeefcafebeefcafebeefcafebeefcafebeefcafebeefcafebeefcafa") // dup2
.unwrap(),
B256::from_str("0xbeefcafebeefcafebeefcafebeefcafebeefcafebeefcafebeefcafebeefcafa") // removed dup2
.unwrap(),
B256::from_str("0xbeefcafebeefcafebeefcafebeefcafebeefcafebeefcafebeefcafebeefcafa") // removed dup2
.unwrap(),
B256::from_str("0xbeefcafebeefcafebeefcafebeefcafebeefcafebeefcafebeefcafebeefbbbb") // removed dup1
.unwrap(),
];
let announcement = NewPooledTransactionHashes66(hashes.clone());
let filter: MessageFilter = MessageFilter::default();
let (outcome, partially_valid_data) = filter.partially_filter_valid_entries(announcement);
assert_eq!(outcome, FilterOutcome::ReportPeer);
let mut expected_data = HashMap::default();
expected_data.insert(hashes[1], None);
expected_data.insert(hashes[0], None);
assert_eq!(expected_data, partially_valid_data.into_data())
}
#[test]
fn test_display_for_zst() {
let filter = EthMessageFilter;
assert_eq!("EthMessageFilter", &filter.to_string());
}
}