diff --git a/crates/net/network/src/config.rs b/crates/net/network/src/config.rs index f6409fe84b..860dc9c929 100644 --- a/crates/net/network/src/config.rs +++ b/crates/net/network/src/config.rs @@ -241,12 +241,24 @@ impl NetworkConfigBuilder { self.listener_addr(addr).discovery_addr(addr) } - /// Sets the socket address the network will listen on + /// Sets the socket address the network will listen on. + /// + /// By default, this is [Ipv4Addr::UNSPECIFIED] on [DEFAULT_DISCOVERY_PORT] pub fn listener_addr(mut self, listener_addr: SocketAddr) -> Self { self.listener_addr = Some(listener_addr); self } + /// Sets the port of the address the network will listen on. + /// + /// By default, this is [DEFAULT_DISCOVERY_PORT] + pub fn listener_port(mut self, port: u16) -> Self { + self.listener_addr + .get_or_insert(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, DEFAULT_DISCOVERY_PORT).into()) + .set_port(port); + self + } + /// Sets the socket address the discovery network will listen on pub fn discovery_addr(mut self, discovery_addr: SocketAddr) -> Self { self.discovery_addr = Some(discovery_addr); @@ -277,6 +289,11 @@ impl NetworkConfigBuilder { self } + /// Disables all discovery. + pub fn no_discovery(self) -> Self { + self.no_discv4_discovery().no_dns_discovery() + } + /// Sets the boot nodes. pub fn boot_nodes(mut self, nodes: impl IntoIterator) -> Self { self.boot_nodes = nodes.into_iter().collect(); diff --git a/crates/net/network/src/transactions.rs b/crates/net/network/src/transactions.rs index 17f927c119..2847436e54 100644 --- a/crates/net/network/src/transactions.rs +++ b/crates/net/network/src/transactions.rs @@ -318,7 +318,7 @@ where } } - /// Handles dedicated transaction events related tot the `eth` protocol. + /// Handles dedicated transaction events related to the `eth` protocol. fn on_network_tx_event(&mut self, event: NetworkTransactionEvent) { match event { NetworkTransactionEvent::IncomingTransactions { peer_id, msg } => { @@ -711,10 +711,12 @@ pub enum NetworkTransactionEvent { #[cfg(test)] mod tests { use super::*; - use crate::{NetworkConfigBuilder, NetworkManager}; + use crate::{test_utils::Testnet, NetworkConfigBuilder, NetworkManager}; use reth_interfaces::sync::{SyncState, SyncStateUpdater}; + use reth_network_api::NetworkInfo; use reth_provider::test_utils::NoopProvider; - use reth_transaction_pool::test_utils::testing_pool; + use reth_rlp::Decodable; + use reth_transaction_pool::test_utils::{testing_pool, MockTransaction}; use secp256k1::SecretKey; #[tokio::test(flavor = "multi_thread")] @@ -726,7 +728,8 @@ mod tests { let client = NoopProvider::default(); let pool = testing_pool(); - let config = NetworkConfigBuilder::new(secret_key).build(client); + let config = + NetworkConfigBuilder::new(secret_key).no_discovery().listener_port(0).build(client); let (handle, network, mut transactions, _) = NetworkManager::new(config) .await .unwrap() @@ -737,7 +740,7 @@ mod tests { tokio::task::spawn(network); handle.update_sync_state(SyncState::Downloading { target_block: 100 }); - assert!(handle.is_syncing()); + assert!(NetworkInfo::is_syncing(&handle)); let peer_id = PeerId::random(); @@ -748,4 +751,161 @@ mod tests { assert!(pool.is_empty()); } + + #[tokio::test(flavor = "multi_thread")] + async fn test_handle_incoming_transactions() { + reth_tracing::init_test_tracing(); + let net = Testnet::create(3).await; + + let mut handles = net.handles(); + let handle0 = handles.next().unwrap(); + let handle1 = handles.next().unwrap(); + + drop(handles); + let handle = net.spawn(); + + let listener0 = handle0.event_listener(); + + handle0.add_peer(*handle1.peer_id(), handle1.local_addr()); + let secret_key = SecretKey::new(&mut rand::thread_rng()); + + let client = NoopProvider::default(); + let pool = testing_pool(); + let config = + NetworkConfigBuilder::new(secret_key).no_discovery().listener_port(0).build(client); + let (network_handle, network, mut transactions, _) = NetworkManager::new(config) + .await + .unwrap() + .into_builder() + .transactions(pool.clone()) + .split_with_handle(); + tokio::task::spawn(network); + + network_handle.update_sync_state(SyncState::Idle); + + assert!(!NetworkInfo::is_syncing(&network_handle)); + + // wait for all initiator connections + let mut established = listener0.take(2); + while let Some(ev) = established.next().await { + match ev { + NetworkEvent::SessionEstablished { + peer_id, + capabilities, + messages, + status, + version, + } => { + // to insert a new peer in transactions peerset + transactions.on_network_event(NetworkEvent::SessionEstablished { + peer_id, + capabilities, + messages, + status, + version, + }) + } + NetworkEvent::PeerAdded(_peer_id) => continue, + ev => { + panic!("unexpected event {ev:?}") + } + } + } + // random tx: + let input = hex::decode("02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76").unwrap(); + let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap(); + transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions { + peer_id: *handle1.peer_id(), + msg: Transactions(vec![signed_tx.clone()]), + }); + assert_eq!( + *handle1.peer_id(), + transactions.transactions_by_peers.get(&signed_tx.hash()).unwrap()[0] + ); + handle.terminate().await; + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_on_get_pooled_transactions_network() { + reth_tracing::init_test_tracing(); + let net = Testnet::create(2).await; + + let mut handles = net.handles(); + let handle0 = handles.next().unwrap(); + let handle1 = handles.next().unwrap(); + + drop(handles); + let handle = net.spawn(); + + let listener0 = handle0.event_listener(); + + handle0.add_peer(*handle1.peer_id(), handle1.local_addr()); + let secret_key = SecretKey::new(&mut rand::thread_rng()); + + let client = NoopProvider::default(); + let pool = testing_pool(); + let config = + NetworkConfigBuilder::new(secret_key).no_discovery().listener_port(0).build(client); + let (network_handle, network, mut transactions, _) = NetworkManager::new(config) + .await + .unwrap() + .into_builder() + .transactions(pool.clone()) + .split_with_handle(); + tokio::task::spawn(network); + + network_handle.update_sync_state(SyncState::Idle); + + assert!(!NetworkInfo::is_syncing(&network_handle)); + + // wait for all initiator connections + let mut established = listener0.take(2); + while let Some(ev) = established.next().await { + match ev { + NetworkEvent::SessionEstablished { + peer_id, + capabilities, + messages, + status, + version, + } => transactions.on_network_event(NetworkEvent::SessionEstablished { + peer_id, + capabilities, + messages, + status, + version, + }), + NetworkEvent::PeerAdded(_peer_id) => continue, + ev => { + panic!("unexpected event {ev:?}") + } + } + } + handle.terminate().await; + + let tx = MockTransaction::eip1559(); + let _ = transactions + .pool + .add_transaction(reth_transaction_pool::TransactionOrigin::External, tx.clone()) + .await; + + let request = GetPooledTransactions(vec![tx.get_hash()]); + + let (send, receive) = oneshot::channel::>(); + + transactions.on_network_tx_event(NetworkTransactionEvent::GetPooledTransactions { + peer_id: *handle1.peer_id(), + request, + response: send, + }); + + match receive.await.unwrap() { + Ok(PooledTransactions(transactions)) => { + assert_eq!(transactions.len(), 1); + } + Err(e) => { + panic!("error: {:?}", e); + } + } + } } diff --git a/crates/transaction-pool/src/test_utils/mock.rs b/crates/transaction-pool/src/test_utils/mock.rs index 5da0ad56f1..47fc930627 100644 --- a/crates/transaction-pool/src/test_utils/mock.rs +++ b/crates/transaction-pool/src/test_utils/mock.rs @@ -12,9 +12,9 @@ use rand::{ prelude::Distribution, }; use reth_primitives::{ - constants::MIN_PROTOCOL_BASE_FEE, Address, FromRecoveredTransaction, IntoRecoveredTransaction, - Transaction, TransactionKind, TransactionSignedEcRecovered, TxEip1559, TxHash, TxLegacy, - TxType, H256, U128, U256, + constants::MIN_PROTOCOL_BASE_FEE, hex, Address, FromRecoveredTransaction, + IntoRecoveredTransaction, Signature, Transaction, TransactionKind, TransactionSigned, + TransactionSignedEcRecovered, TxEip1559, TxHash, TxLegacy, TxType, H256, U128, U256, }; use std::{ops::Range, sync::Arc, time::Instant}; @@ -415,7 +415,25 @@ impl FromRecoveredTransaction for MockTransaction { impl IntoRecoveredTransaction for MockTransaction { fn to_recovered_transaction(&self) -> TransactionSignedEcRecovered { - todo!() + let tx = Transaction::Legacy(TxLegacy { + chain_id: self.chain_id(), + nonce: self.get_nonce(), + gas_price: self.get_gas_price(), + gas_limit: self.get_gas_limit(), + to: TransactionKind::Call(Address::from_slice( + &hex::decode("d3e8763675e4c425df46cc3b5c0f6cbdac396046").unwrap()[..], + )), + value: 693361000000000u64.into(), + input: Default::default(), + }); + + let signed_tx = TransactionSigned { + hash: *self.hash(), + signature: Signature::default(), + transaction: tx, + }; + + TransactionSignedEcRecovered::from_signed_transaction(signed_tx, self.sender()) } }