refactor(optimism_txpool): Move interop revalidation logic to SupervisorClient stream (#16148)

Signed-off-by: 7suyash7 <suyashnyn1@gmail.com>
This commit is contained in:
Suyash Nayan
2025-05-13 13:41:03 +05:30
committed by GitHub
parent e5d59bad7e
commit 5ac2957d70
2 changed files with 93 additions and 44 deletions

View File

@@ -12,14 +12,14 @@ use crate::{
interop::{is_stale_interop, is_valid_interop, MaybeInteropTransaction},
supervisor::SupervisorClient,
};
use alloy_consensus::{conditional::BlockConditionalAttributes, BlockHeader, Transaction};
use alloy_consensus::{conditional::BlockConditionalAttributes, BlockHeader};
use futures_util::{future::BoxFuture, FutureExt, Stream, StreamExt};
use metrics::Gauge;
use reth_chain_state::CanonStateNotification;
use reth_metrics::{metrics::Counter, Metrics};
use reth_primitives_traits::NodePrimitives;
use reth_transaction_pool::{error::PoolTransactionError, PoolTransaction, TransactionPool};
use std::sync::Arc;
use tracing::warn;
/// Transaction pool maintenance metrics
#[derive(Metrics)]
@@ -153,66 +153,65 @@ pub async fn maintain_transaction_pool_interop<N, Pool, St>(
St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
{
let metrics = MaintainPoolInteropMetrics::default();
let supervisor_client = Arc::new(supervisor_client);
loop {
let Some(event) = events.next().await else { break };
if let CanonStateNotification::Commit { new } = event {
let timestamp = new.tip().timestamp();
let mut to_remove = Vec::new();
let mut to_revalidate = Vec::new();
let mut to_revalidate: Vec<<Pool as TransactionPool>::Transaction> = Vec::new();
let mut interop_count = 0;
for tx in &pool.pooled_transactions() {
// Only interop txs have this field set
if let Some(interop) = tx.transaction.interop_deadline() {
for tx_arc_wrapper in pool.pooled_transactions() {
if let Some(interop_deadline_val) = tx_arc_wrapper.transaction.interop_deadline() {
interop_count += 1;
if !is_valid_interop(interop, timestamp) {
// That means tx didn't revalidated during [`OFFSET_TIME`] time
// We could assume that it won't be validated at all and remove it
to_remove.push(*tx.hash());
} else if is_stale_interop(interop, timestamp, OFFSET_TIME) {
// If tx has less then [`OFFSET_TIME`] of valid time we revalidate it
to_revalidate.push(tx.clone())
if !is_valid_interop(interop_deadline_val, timestamp) {
to_remove.push(*tx_arc_wrapper.transaction.hash());
} else if is_stale_interop(interop_deadline_val, timestamp, OFFSET_TIME) {
to_revalidate.push(tx_arc_wrapper.transaction.clone());
}
}
}
metrics.set_interop_txs_in_pool(interop_count);
if !to_revalidate.is_empty() {
metrics.inc_stale_tx_interop(to_revalidate.len());
let checks_stream =
futures_util::stream::iter(to_revalidate.into_iter().map(|tx| {
let supervisor_client = supervisor_client.clone();
async move {
let check = supervisor_client
.is_valid_cross_tx(
tx.transaction.access_list(),
tx.transaction.hash(),
timestamp,
Some(TRANSACTION_VALIDITY_WINDOW),
// We could assume that interop is enabled, because
// tx.transaction.interop() would be set only in
// this case
true,
)
.await;
(tx.clone(), check)
let revalidation_stream = supervisor_client.revalidate_interop_txs_stream(
to_revalidate,
timestamp,
TRANSACTION_VALIDITY_WINDOW,
MAX_SUPERVISOR_QUERIES,
);
futures_util::pin_mut!(revalidation_stream);
while let Some((tx_item_from_stream, validation_result)) =
revalidation_stream.next().await
{
match validation_result {
Some(Ok(())) => {
tx_item_from_stream
.set_interop_deadline(timestamp + TRANSACTION_VALIDITY_WINDOW);
}
}))
.buffered(MAX_SUPERVISOR_QUERIES);
futures_util::pin_mut!(checks_stream);
while let Some((tx, check)) = checks_stream.next().await {
if let Some(Err(err)) = check {
// We remove only bad transaction. If error caused by supervisor instability
// or other fixable issues transaction would be validated on next state
// change, so we ignore it
if err.is_bad_transaction() {
to_remove.push(*tx.transaction.hash());
Some(Err(err)) => {
if err.is_bad_transaction() {
to_remove.push(*tx_item_from_stream.hash());
}
}
None => {
warn!(
target: "txpool",
hash = %tx_item_from_stream.hash(),
"Interop transaction no longer considered cross-chain during revalidation; removing."
);
to_remove.push(*tx_item_from_stream.hash());
}
} else {
tx.transaction.set_interop_deadline(timestamp + TRANSACTION_VALIDITY_WINDOW)
}
}
}
if !to_remove.is_empty() {
let removed = pool.remove_transactions(to_remove);
metrics.inc_removed_tx_interop(removed.len());

View File

@@ -1,17 +1,24 @@
//! This is our custom implementation of validator struct
use crate::{
interop::MaybeInteropTransaction,
supervisor::{
metrics::SupervisorMetrics, parse_access_list_items_to_inbox_entries, ExecutingDescriptor,
InteropTxValidatorError,
},
InvalidCrossTx,
};
use alloy_consensus::Transaction;
use alloy_eips::eip2930::AccessList;
use alloy_primitives::{TxHash, B256};
use alloy_rpc_client::ReqwestClient;
use futures_util::future::BoxFuture;
use futures_util::{
future::BoxFuture,
stream::{self, StreamExt},
Stream,
};
use op_alloy_consensus::interop::SafetyLevel;
use reth_transaction_pool::PoolTransaction;
use std::{
borrow::Cow,
future::IntoFuture,
@@ -111,6 +118,49 @@ impl SupervisorClient {
}
Some(Ok(()))
}
/// Creates a stream that revalidates interop transactions against the supervisor.
/// Returns
/// An implementation of `Stream` that is `Send`-able and tied to the lifetime `'a` of `self`.
/// Each item yielded by the stream is a tuple `(TItem, Option<Result<(), InvalidCrossTx>>)`.
/// - The first element is the original `TItem` that was revalidated.
/// - The second element is the `Option<Result<(), InvalidCrossTx>>` describes the outcome
/// - `None`: Transaction was not identified as a cross-chain candidate by initial checks.
/// - `Some(Ok(()))`: Supervisor confirmed the transaction is valid.
/// - `Some(Err(InvalidCrossTx))`: Supervisor indicated the transaction is invalid.
pub fn revalidate_interop_txs_stream<'a, TItem, InputIter>(
&'a self,
txs_to_revalidate: InputIter,
current_timestamp: u64,
revalidation_window: u64,
max_concurrent_queries: usize,
) -> impl Stream<Item = (TItem, Option<Result<(), InvalidCrossTx>>)> + Send + 'a
where
InputIter: IntoIterator<Item = TItem> + Send + 'a,
InputIter::IntoIter: Send + 'a,
TItem:
MaybeInteropTransaction + PoolTransaction + Transaction + Clone + Send + Sync + 'static,
{
stream::iter(txs_to_revalidate.into_iter().map(move |tx_item| {
let client_for_async_task = self.clone();
async move {
let validation_result = client_for_async_task
.is_valid_cross_tx(
tx_item.access_list(),
tx_item.hash(),
current_timestamp,
Some(revalidation_window),
true,
)
.await;
// return the original transaction paired with its validation result.
(tx_item, validation_result)
}
}))
.buffered(max_concurrent_queries)
}
}
/// Holds supervisor data. Inner type of [`SupervisorClient`].