feat(net): test syncing from geth (#623)

Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>
This commit is contained in:
Dan Cline
2023-01-31 00:35:02 -05:00
committed by GitHub
parent cc43b72835
commit f771e23f9a
7 changed files with 753 additions and 225 deletions

510
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -21,6 +21,7 @@ reth-network = {path = "../../crates/net/network", features = ["serde"] }
reth-primitives = { path = "../../crates/primitives" }
reth-provider = { path = "../../crates/storage/provider", features = ["test-utils"] }
reth-net-nat = { path = "../../crates/net/nat" }
reth-interfaces = { path = "../interfaces", optional = true }
# io
serde = "1.0"
@@ -32,4 +33,66 @@ walkdir = "2.3.2"
eyre = "0.6.8"
shellexpand = "3.0.0"
tracing = "0.1.37"
tempfile = "3.3.0"
# crypto
rand = { version = "0.8", optional = true }
# errors
thiserror = { version = "1", optional = true }
# enr
enr = { version = "0.7.0", features = ["serde", "rust-secp256k1"], optional = true }
# ethers
ethers-core = { git = "https://github.com/gakonst/ethers-rs", default-features = false, optional = true }
ethers-providers = { git = "https://github.com/gakonst/ethers-rs", features = ["ws"], default-features = false, optional = true }
ethers-middleware = { git = "https://github.com/gakonst/ethers-rs", default-features = false, optional = true }
ethers-signers = { git = "https://github.com/gakonst/ethers-rs", default-features = false, optional = true }
# async / futures
async-trait = { version = "0.1", optional = true }
tokio = { version = "1", features = ["io-util", "net", "macros", "rt-multi-thread", "time"], optional = true }
tokio-test = { version = "0.4", optional = true }
# misc
tempfile = { version = "3.3", optional = true }
hex = { version = "0.4", optional = true }
[dev-dependencies]
# reth crates
reth-tracing = { path = "../tracing" }
reth-stages = { path = "../stages" }
reth-downloaders = { path = "../net/downloaders" }
reth-staged-sync = { path = ".", features = ["test-utils"] }
# async/futures
futures = "0.3"
tokio = { version = "1", features = ["io-util", "net", "macros", "rt-multi-thread", "time"] }
tokio-stream = "0.1"
# crypto
secp256k1 = { version = "0.24", features = [
"global-context",
"rand-std",
"recovery",
] }
[features]
test-utils = [
"reth-network/test-utils",
"reth-interfaces/test-utils",
"reth-network/test-utils",
"reth-provider/test-utils",
"dep:enr",
"dep:ethers-core",
"dep:tempfile",
"dep:thiserror",
"dep:hex",
"dep:rand",
"dep:tokio",
"dep:tokio-test",
"dep:ethers-signers",
"dep:ethers-providers",
"dep:ethers-middleware",
"dep:async-trait"
]

View File

@@ -2,3 +2,7 @@ pub mod config;
pub use config::Config;
pub mod utils;
#[cfg(any(test, feature = "test-utils"))]
/// Common helpers for integration testing.
pub mod test_utils;

View File

@@ -0,0 +1,122 @@
//! Helper struct for working with a clique geth instance.
use enr::k256::ecdsa::SigningKey;
use ethers_core::utils::{Geth, GethInstance};
use ethers_middleware::SignerMiddleware;
use ethers_providers::{Provider, Ws};
use ethers_signers::{LocalWallet, Wallet};
use std::{
io::{BufRead, BufReader},
net::SocketAddr,
};
/// A [`Geth`](ethers_core::utils::Geth) instance configured with Clique and a custom
/// [`Genesis`](ethers_core::utils::Genesis).
///
/// This holds a [`SignerMiddleware`](ethers_middleware::SignerMiddleware) for
/// enabling block production and creating transactions.
///
/// # Example
/// ```no_run
/// # use ethers_core::utils::Geth;
/// # use reth_staged_sync::test_utils::CliqueGethInstance;
/// # tokio_test::block_on(async {
///
/// // this creates a funded geth
/// let clique_geth = Geth::new()
/// .p2p_port(30303)
/// .chain_id(1337u64);
///
/// // build the funded geth, generating a random signing key and enabling clique
/// let mut clique = CliqueGethInstance::new(clique_geth, None).await;
///
/// // don't print logs, but drain the stderr
/// clique.prevent_blocking().await;
/// # });
/// ```
pub struct CliqueGethInstance {
/// The spawned [`GethInstance`](ethers_core::utils::GethInstance).
pub instance: GethInstance,
/// The provider who can talk to this instance
pub provider: SignerMiddleware<Provider<Ws>, Wallet<SigningKey>>,
}
impl CliqueGethInstance {
/// Sets up a new [`SignerMiddleware`](ethers_middleware::SignerMiddleware)
/// for the [`Geth`](ethers_core::utils::Geth) instance and returns the
/// [`CliqueGethInstance`](crate::test_utils::CliqueGethInstance).
///
/// The signer is assumed to be the clique signer and the signer for any transactions sent for
/// block production.
///
/// This also spawns the geth instance.
pub async fn new(geth: Geth, signer: Option<SigningKey>) -> Self {
let signer = signer.unwrap_or_else(|| SigningKey::random(&mut rand::thread_rng()));
let geth = geth.set_clique_private_key(signer.clone());
// spawn the geth instance
let instance = geth.spawn();
// create the signer
let wallet: LocalWallet = signer.clone().into();
// set up ethers provider
let geth_endpoint = SocketAddr::new([127, 0, 0, 1].into(), instance.port()).to_string();
let provider = Provider::<Ws>::connect(format!("ws://{geth_endpoint}")).await.unwrap();
let provider =
SignerMiddleware::new_with_provider_chain(provider, wallet.clone()).await.unwrap();
Self { instance, provider }
}
/// Prints the logs of the [`Geth`](ethers_core::utils::Geth) instance in a new
/// [`task`](tokio::task).
#[allow(dead_code)]
pub async fn print_logs(&mut self) {
// take the stderr of the geth instance and print it
let stderr = self.instance.stderr().unwrap();
// print logs in a new task
let mut err_reader = BufReader::new(stderr);
tokio::spawn(async move {
loop {
if let (Ok(line), line_str) = {
let mut buf = String::new();
(err_reader.read_line(&mut buf), buf.clone())
} {
if line == 0 {
break
}
if !line_str.is_empty() {
dbg!(line_str);
}
}
}
});
}
/// Prevents the [`Geth`](ethers_core::utils::Geth) instance from blocking due to the `stderr`
/// filling up.
pub async fn prevent_blocking(&mut self) {
// take the stderr of the geth instance and print it
let stderr = self.instance.stderr().unwrap();
// print logs in a new task
let mut err_reader = BufReader::new(stderr);
tokio::spawn(async move {
loop {
if let (Ok(line), _line_str) = {
let mut buf = String::new();
(err_reader.read_line(&mut buf), buf.clone())
} {
if line == 0 {
break
}
}
}
});
}
}

View File

@@ -0,0 +1,128 @@
//! Helper extension traits for working with clique providers.
use async_trait::async_trait;
use enr::k256::ecdsa::SigningKey;
use ethers_core::{
types::{transaction::eip2718::TypedTransaction, Address, Block, BlockNumber, H256},
utils::secret_key_to_address,
};
use ethers_middleware::SignerMiddleware;
use ethers_providers::Middleware;
use ethers_signers::Signer;
use reth_network::test_utils::enr_to_peer_id;
use reth_primitives::PeerId;
use thiserror::Error;
use tracing::trace;
/// An error that can occur when using the
/// [`CliqueMiddleware`](crate::test_utils::CliqueMiddleware).
#[derive(Error, Debug)]
pub enum CliqueError<E> {
/// Error encountered when using the provider
#[error(transparent)]
ProviderError(#[from] E),
/// No genesis block returned from the provider
#[error("no genesis block returned from the provider")]
NoGenesis,
/// No tip block returned from the provider
#[error("no tip block returned from the provider")]
NoTip,
/// Account was not successfully unlocked on the provider
#[error("account was not successfully unlocked on the provider")]
AccountNotUnlocked,
/// Mining was not successfully enabled on the provider
#[error("mining was not successfully enabled on the provider")]
MiningNotEnabled,
/// Mismatch between locally computed address and address returned from the provider
#[error("local address {local} does not match remote address {remote}")]
AddressMismatch {
/// The locally computed address
local: Address,
/// The address returned from the provider
remote: Address,
},
}
/// Error type for [`CliqueMiddleware`](crate::test_utils::CliqueMiddleware).
pub type CliqueMiddlewareError<M> = CliqueError<<M as Middleware>::Error>;
/// Extension trait for [`Middleware`](ethers_providers::Middleware) to provide clique specific
/// functionality.
#[async_trait(?Send)]
pub trait CliqueMiddleware: Send + Sync + Middleware {
/// Enable mining on the clique geth instance by importing and unlocking the signer account
/// derived from given private key and password.
async fn enable_mining(
&self,
signer: SigningKey,
password: String,
) -> Result<(), CliqueMiddlewareError<Self>> {
let our_address = secret_key_to_address(&signer);
// send the private key to geth and unlock it
let key_bytes = signer.to_bytes().to_vec().into();
trace!(
private_key=%hex::encode(&key_bytes),
"Importing private key"
);
let unlocked_addr = self.import_raw_key(key_bytes, password.to_string()).await?;
if unlocked_addr != our_address {
return Err(CliqueError::AddressMismatch { local: our_address, remote: unlocked_addr })
}
let unlock_success = self.unlock_account(our_address, password.to_string(), None).await?;
if !unlock_success {
return Err(CliqueError::AccountNotUnlocked)
}
// start mining?
self.start_mining(None).await?;
// check that we are mining
let mining = self.mining().await?;
if !mining {
return Err(CliqueError::MiningNotEnabled)
}
Ok(())
}
/// Returns the chain tip of the [`Geth`](ethers_core::utils::Geth) instance by calling
/// geth's `eth_getBlock`.
async fn remote_tip_block(&self) -> Result<Block<H256>, CliqueMiddlewareError<Self>> {
self.get_block(BlockNumber::Latest).await?.ok_or(CliqueError::NoTip)
}
/// Returns the genesis block of the [`Geth`](ethers_core::utils::Geth) instance by calling
/// geth's `eth_getBlock`.
async fn remote_genesis_block(&self) -> Result<Block<H256>, CliqueMiddlewareError<Self>> {
self.get_block(BlockNumber::Earliest).await?.ok_or(CliqueError::NoGenesis)
}
/// Signs and sends the given unsigned transactions sequentially, signing with the private key
/// used to configure the [`CliqueGethInstance`](crate::test_utils::CliqueGethInstance).
async fn send_requests<T: IntoIterator<Item = TypedTransaction>>(
&self,
txs: T,
) -> Result<(), CliqueMiddlewareError<Self>> {
for tx in txs {
self.send_transaction(tx, None).await?;
}
Ok(())
}
/// Returns the [`Geth`](ethers_core::utils::Geth) instance [`PeerId`](reth_primitives::PeerId)
/// by calling geth's `admin_nodeInfo`.
async fn peer_id(&self) -> Result<PeerId, CliqueMiddlewareError<Self>> {
Ok(enr_to_peer_id(self.node_info().await?.enr))
}
}
impl<M: Middleware, S: Signer> CliqueMiddleware for SignerMiddleware<M, S> {}

View File

@@ -0,0 +1,9 @@
#![warn(missing_docs, unreachable_pub)]
//! Common helpers for staged sync integration testing.
pub mod clique;
pub mod clique_middleware;
pub use clique::CliqueGethInstance;
pub use clique_middleware::{CliqueError, CliqueMiddleware, CliqueMiddlewareError};

View File

@@ -0,0 +1,140 @@
use ethers_core::{
types::{transaction::eip2718::TypedTransaction, Eip1559TransactionRequest, H160, U64},
utils::Geth,
};
use ethers_providers::Middleware;
use reth_network::{
test_utils::{unused_port, unused_tcp_udp, NetworkEventStream},
NetworkConfig, NetworkManager,
};
use reth_primitives::{
constants::EIP1559_INITIAL_BASE_FEE, ChainSpec, Hardfork, Header, PeerId, SealedHeader,
};
use reth_provider::test_utils::NoopProvider;
use reth_staged_sync::test_utils::{CliqueGethInstance, CliqueMiddleware};
use secp256k1::SecretKey;
use std::{net::SocketAddr, sync::Arc};
#[tokio::test(flavor = "multi_thread")]
async fn can_peer_with_geth() {
reth_tracing::init_test_tracing();
let (clique, chainspec) = init_geth().await;
let geth_p2p_port = clique.instance.p2p_port().unwrap();
// === initialize reth networking stack ===
let secret_key = SecretKey::new(&mut rand::thread_rng());
let (reth_p2p, reth_disc) = unused_tcp_udp();
tracing::info!(
%reth_p2p,
%reth_disc,
"setting up reth networking stack in keepalive test"
);
let config = NetworkConfig::<Arc<NoopProvider>>::builder(secret_key)
.listener_addr(reth_p2p)
.discovery_addr(reth_disc)
.chain_spec(chainspec)
.build(Arc::new(NoopProvider::default()));
let network = NetworkManager::new(config).await.unwrap();
let handle = network.handle().clone();
tokio::task::spawn(network);
// create networkeventstream to get the next session established event easily
let mut events = NetworkEventStream::new(handle.event_listener());
let geth_socket = SocketAddr::new([127, 0, 0, 1].into(), geth_p2p_port);
// get the peer id we should be expecting
let geth_peer_id: PeerId = clique.provider.peer_id().await.unwrap();
// add geth as a peer then wait for `PeerAdded` and `SessionEstablished` events.
handle.add_peer(geth_peer_id, geth_socket);
// wait for the session to be established
let peer_id = events.peer_added_and_established().await.unwrap();
assert_eq!(geth_peer_id, peer_id);
}
async fn init_geth() -> (CliqueGethInstance, ChainSpec) {
// first create a signer that we will fund so we can make transactions
let chain_id = 13337u64;
let data_dir = tempfile::tempdir().expect("should be able to create temp geth datadir");
let dir_path = data_dir.path();
tracing::info!(
data_dir=?dir_path,
"initializing geth instance"
);
// this creates a funded geth
let clique_geth =
Geth::new().chain_id(chain_id).p2p_port(unused_port()).data_dir(dir_path.to_str().unwrap());
// build the funded geth
let mut clique = CliqueGethInstance::new(clique_geth, None).await;
let geth_p2p_port =
clique.instance.p2p_port().expect("geth should be configured with a p2p port");
tracing::info!(
p2p_port=%geth_p2p_port,
rpc_port=%clique.instance.port(),
"configured clique geth instance in keepalive test"
);
// don't print logs, but drain the stderr
clique.prevent_blocking().await;
// get geth to start producing blocks - use a blank password
let clique_private_key = clique
.instance
.clique_private_key()
.clone()
.expect("clique should be configured with a private key");
clique.provider.enable_mining(clique_private_key, "".into()).await.unwrap();
// === check that we have the same genesis hash ===
// get the chainspec from the genesis we configured for geth
let mut chainspec: ChainSpec = clique
.instance
.genesis()
.clone()
.expect("clique should be configured with a genesis")
.into();
let remote_genesis = SealedHeader::from(clique.provider.remote_genesis_block().await.unwrap());
let mut local_genesis_header = Header::from(chainspec.genesis().clone());
let hardforks = chainspec.hardforks();
// set initial base fee depending on eip-1559
if let Some(0) = hardforks.get(&Hardfork::London) {
local_genesis_header.base_fee_per_gas = Some(EIP1559_INITIAL_BASE_FEE);
}
let local_genesis = local_genesis_header.seal();
assert_eq!(local_genesis, remote_genesis, "genesis blocks should match, we computed {local_genesis:#?} but geth computed {remote_genesis:#?}");
// set the chainspec genesis hash
chainspec.genesis_hash = local_genesis.hash();
// === create many blocks ===
let nonces = 0..1000u64;
let txs = nonces.map(|nonce| {
// create a tx that just sends to the zero addr
TypedTransaction::Eip1559(
Eip1559TransactionRequest::new().to(H160::zero()).value(1u64).nonce(nonce),
)
});
tracing::info!("generated transactions for blocks");
// finally send the txs to geth
clique.provider.send_requests(txs).await.unwrap();
let block = clique.provider.get_block_number().await.unwrap();
assert!(block > U64::zero());
(clique, chainspec)
}