Network integrations tests with txpool task (#2148)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
Aditya Pandey
2023-04-09 17:01:14 +05:30
committed by GitHub
parent 13f59ea438
commit b2c3074260
3 changed files with 205 additions and 10 deletions

View File

@@ -241,12 +241,24 @@ impl NetworkConfigBuilder {
self.listener_addr(addr).discovery_addr(addr)
}
/// Sets the socket address the network will listen on
/// Sets the socket address the network will listen on.
///
/// By default, this is [Ipv4Addr::UNSPECIFIED] on [DEFAULT_DISCOVERY_PORT]
pub fn listener_addr(mut self, listener_addr: SocketAddr) -> Self {
self.listener_addr = Some(listener_addr);
self
}
/// Sets the port of the address the network will listen on.
///
/// By default, this is [DEFAULT_DISCOVERY_PORT]
pub fn listener_port(mut self, port: u16) -> Self {
self.listener_addr
.get_or_insert(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, DEFAULT_DISCOVERY_PORT).into())
.set_port(port);
self
}
/// Sets the socket address the discovery network will listen on
pub fn discovery_addr(mut self, discovery_addr: SocketAddr) -> Self {
self.discovery_addr = Some(discovery_addr);
@@ -277,6 +289,11 @@ impl NetworkConfigBuilder {
self
}
/// Disables all discovery.
pub fn no_discovery(self) -> Self {
self.no_discv4_discovery().no_dns_discovery()
}
/// Sets the boot nodes.
pub fn boot_nodes(mut self, nodes: impl IntoIterator<Item = NodeRecord>) -> Self {
self.boot_nodes = nodes.into_iter().collect();

View File

@@ -318,7 +318,7 @@ where
}
}
/// Handles dedicated transaction events related tot the `eth` protocol.
/// Handles dedicated transaction events related to the `eth` protocol.
fn on_network_tx_event(&mut self, event: NetworkTransactionEvent) {
match event {
NetworkTransactionEvent::IncomingTransactions { peer_id, msg } => {
@@ -711,10 +711,12 @@ pub enum NetworkTransactionEvent {
#[cfg(test)]
mod tests {
use super::*;
use crate::{NetworkConfigBuilder, NetworkManager};
use crate::{test_utils::Testnet, NetworkConfigBuilder, NetworkManager};
use reth_interfaces::sync::{SyncState, SyncStateUpdater};
use reth_network_api::NetworkInfo;
use reth_provider::test_utils::NoopProvider;
use reth_transaction_pool::test_utils::testing_pool;
use reth_rlp::Decodable;
use reth_transaction_pool::test_utils::{testing_pool, MockTransaction};
use secp256k1::SecretKey;
#[tokio::test(flavor = "multi_thread")]
@@ -726,7 +728,8 @@ mod tests {
let client = NoopProvider::default();
let pool = testing_pool();
let config = NetworkConfigBuilder::new(secret_key).build(client);
let config =
NetworkConfigBuilder::new(secret_key).no_discovery().listener_port(0).build(client);
let (handle, network, mut transactions, _) = NetworkManager::new(config)
.await
.unwrap()
@@ -737,7 +740,7 @@ mod tests {
tokio::task::spawn(network);
handle.update_sync_state(SyncState::Downloading { target_block: 100 });
assert!(handle.is_syncing());
assert!(NetworkInfo::is_syncing(&handle));
let peer_id = PeerId::random();
@@ -748,4 +751,161 @@ mod tests {
assert!(pool.is_empty());
}
#[tokio::test(flavor = "multi_thread")]
async fn test_handle_incoming_transactions() {
reth_tracing::init_test_tracing();
let net = Testnet::create(3).await;
let mut handles = net.handles();
let handle0 = handles.next().unwrap();
let handle1 = handles.next().unwrap();
drop(handles);
let handle = net.spawn();
let listener0 = handle0.event_listener();
handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
let secret_key = SecretKey::new(&mut rand::thread_rng());
let client = NoopProvider::default();
let pool = testing_pool();
let config =
NetworkConfigBuilder::new(secret_key).no_discovery().listener_port(0).build(client);
let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
.await
.unwrap()
.into_builder()
.transactions(pool.clone())
.split_with_handle();
tokio::task::spawn(network);
network_handle.update_sync_state(SyncState::Idle);
assert!(!NetworkInfo::is_syncing(&network_handle));
// wait for all initiator connections
let mut established = listener0.take(2);
while let Some(ev) = established.next().await {
match ev {
NetworkEvent::SessionEstablished {
peer_id,
capabilities,
messages,
status,
version,
} => {
// to insert a new peer in transactions peerset
transactions.on_network_event(NetworkEvent::SessionEstablished {
peer_id,
capabilities,
messages,
status,
version,
})
}
NetworkEvent::PeerAdded(_peer_id) => continue,
ev => {
panic!("unexpected event {ev:?}")
}
}
}
// random tx: <https://etherscan.io/getRawTx?tx=0x9448608d36e721ef403c53b00546068a6474d6cbab6816c3926de449898e7bce>
let input = hex::decode("02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76").unwrap();
let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
peer_id: *handle1.peer_id(),
msg: Transactions(vec![signed_tx.clone()]),
});
assert_eq!(
*handle1.peer_id(),
transactions.transactions_by_peers.get(&signed_tx.hash()).unwrap()[0]
);
handle.terminate().await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_on_get_pooled_transactions_network() {
reth_tracing::init_test_tracing();
let net = Testnet::create(2).await;
let mut handles = net.handles();
let handle0 = handles.next().unwrap();
let handle1 = handles.next().unwrap();
drop(handles);
let handle = net.spawn();
let listener0 = handle0.event_listener();
handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
let secret_key = SecretKey::new(&mut rand::thread_rng());
let client = NoopProvider::default();
let pool = testing_pool();
let config =
NetworkConfigBuilder::new(secret_key).no_discovery().listener_port(0).build(client);
let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
.await
.unwrap()
.into_builder()
.transactions(pool.clone())
.split_with_handle();
tokio::task::spawn(network);
network_handle.update_sync_state(SyncState::Idle);
assert!(!NetworkInfo::is_syncing(&network_handle));
// wait for all initiator connections
let mut established = listener0.take(2);
while let Some(ev) = established.next().await {
match ev {
NetworkEvent::SessionEstablished {
peer_id,
capabilities,
messages,
status,
version,
} => transactions.on_network_event(NetworkEvent::SessionEstablished {
peer_id,
capabilities,
messages,
status,
version,
}),
NetworkEvent::PeerAdded(_peer_id) => continue,
ev => {
panic!("unexpected event {ev:?}")
}
}
}
handle.terminate().await;
let tx = MockTransaction::eip1559();
let _ = transactions
.pool
.add_transaction(reth_transaction_pool::TransactionOrigin::External, tx.clone())
.await;
let request = GetPooledTransactions(vec![tx.get_hash()]);
let (send, receive) = oneshot::channel::<RequestResult<PooledTransactions>>();
transactions.on_network_tx_event(NetworkTransactionEvent::GetPooledTransactions {
peer_id: *handle1.peer_id(),
request,
response: send,
});
match receive.await.unwrap() {
Ok(PooledTransactions(transactions)) => {
assert_eq!(transactions.len(), 1);
}
Err(e) => {
panic!("error: {:?}", e);
}
}
}
}

View File

@@ -12,9 +12,9 @@ use rand::{
prelude::Distribution,
};
use reth_primitives::{
constants::MIN_PROTOCOL_BASE_FEE, Address, FromRecoveredTransaction, IntoRecoveredTransaction,
Transaction, TransactionKind, TransactionSignedEcRecovered, TxEip1559, TxHash, TxLegacy,
TxType, H256, U128, U256,
constants::MIN_PROTOCOL_BASE_FEE, hex, Address, FromRecoveredTransaction,
IntoRecoveredTransaction, Signature, Transaction, TransactionKind, TransactionSigned,
TransactionSignedEcRecovered, TxEip1559, TxHash, TxLegacy, TxType, H256, U128, U256,
};
use std::{ops::Range, sync::Arc, time::Instant};
@@ -415,7 +415,25 @@ impl FromRecoveredTransaction for MockTransaction {
impl IntoRecoveredTransaction for MockTransaction {
fn to_recovered_transaction(&self) -> TransactionSignedEcRecovered {
todo!()
let tx = Transaction::Legacy(TxLegacy {
chain_id: self.chain_id(),
nonce: self.get_nonce(),
gas_price: self.get_gas_price(),
gas_limit: self.get_gas_limit(),
to: TransactionKind::Call(Address::from_slice(
&hex::decode("d3e8763675e4c425df46cc3b5c0f6cbdac396046").unwrap()[..],
)),
value: 693361000000000u64.into(),
input: Default::default(),
});
let signed_tx = TransactionSigned {
hash: *self.hash(),
signature: Signature::default(),
transaction: tx,
};
TransactionSignedEcRecovered::from_signed_transaction(signed_tx, self.sender())
}
}