mirror of
https://github.com/paradigmxyz/reth.git
synced 2026-01-30 01:28:21 -05:00
feat(net): add NetworkBuilder type (#421)
This commit is contained in:
59
crates/net/network/src/builder.rs
Normal file
59
crates/net/network/src/builder.rs
Normal file
@@ -0,0 +1,59 @@
|
||||
//! Builder support for configuring the entire setup.
|
||||
|
||||
use crate::{
|
||||
eth_requests::EthRequestHandler, transactions::TransactionsManager, NetworkHandle,
|
||||
NetworkManager,
|
||||
};
|
||||
use reth_transaction_pool::TransactionPool;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
/// A builder that can configure all components of the network.
|
||||
pub struct NetworkBuilder<C, Tx, Eth> {
|
||||
pub(crate) network: NetworkManager<C>,
|
||||
pub(crate) transactions: Tx,
|
||||
pub(crate) request_handler: Eth,
|
||||
}
|
||||
|
||||
// === impl NetworkBuilder ===
|
||||
|
||||
impl<C, Tx, Eth> NetworkBuilder<C, Tx, Eth> {
|
||||
/// Consumes the type and returns all fields.
|
||||
pub fn split(self) -> (NetworkManager<C>, Tx, Eth) {
|
||||
let NetworkBuilder { network, transactions, request_handler } = self;
|
||||
(network, transactions, request_handler)
|
||||
}
|
||||
|
||||
/// Consumes the type and returns all fields and also return a [`NetworkHandle`].
|
||||
pub fn split_with_handle(self) -> (NetworkHandle, NetworkManager<C>, Tx, Eth) {
|
||||
let NetworkBuilder { network, transactions, request_handler } = self;
|
||||
let handle = network.handle().clone();
|
||||
(handle, network, transactions, request_handler)
|
||||
}
|
||||
|
||||
/// Creates a new [`TransactionsManager`] and wires it to the network.
|
||||
pub fn transactions<Pool: TransactionPool>(
|
||||
self,
|
||||
pool: Pool,
|
||||
) -> NetworkBuilder<C, TransactionsManager<Pool>, Eth> {
|
||||
let NetworkBuilder { mut network, request_handler, .. } = self;
|
||||
let (tx, rx) = mpsc::unbounded_channel();
|
||||
network.set_transactions(tx);
|
||||
let handle = network.handle().clone();
|
||||
let transactions = TransactionsManager::new(handle, pool, rx);
|
||||
NetworkBuilder { network, request_handler, transactions }
|
||||
}
|
||||
|
||||
/// Creates a new [`EthRequestHandler`] and wires it to the network.
|
||||
pub fn request_handler<Client>(
|
||||
self,
|
||||
client: Arc<Client>,
|
||||
) -> NetworkBuilder<C, Tx, EthRequestHandler<Client>> {
|
||||
let NetworkBuilder { mut network, transactions, .. } = self;
|
||||
let (tx, rx) = mpsc::unbounded_channel();
|
||||
network.set_eth_request_handler(tx);
|
||||
let peers = network.handle().peers_handle().clone();
|
||||
let request_handler = EthRequestHandler::new(client, peers, rx);
|
||||
NetworkBuilder { network, request_handler, transactions }
|
||||
}
|
||||
}
|
||||
@@ -59,11 +59,7 @@ pub struct EthRequestHandler<C> {
|
||||
}
|
||||
|
||||
// === impl EthRequestHandler ===
|
||||
|
||||
impl<C> EthRequestHandler<C>
|
||||
where
|
||||
C: BlockProvider + HeaderProvider,
|
||||
{
|
||||
impl<C> EthRequestHandler<C> {
|
||||
/// Create a new instance
|
||||
pub fn new(
|
||||
client: Arc<C>,
|
||||
@@ -72,7 +68,12 @@ where
|
||||
) -> Self {
|
||||
Self { client, peers, incoming_requests: UnboundedReceiverStream::new(incoming) }
|
||||
}
|
||||
}
|
||||
|
||||
impl<C> EthRequestHandler<C>
|
||||
where
|
||||
C: BlockProvider + HeaderProvider,
|
||||
{
|
||||
/// Returns the list of requested heders
|
||||
fn get_headers_response(&self, request: GetBlockHeaders) -> Vec<Header> {
|
||||
let GetBlockHeaders { start_block, limit, skip, direction } = request;
|
||||
|
||||
@@ -16,7 +16,7 @@
|
||||
//!
|
||||
//! ## Usage
|
||||
//!
|
||||
//! ### Configure and launch the network
|
||||
//! ### Configure and launch a standalone network
|
||||
//!
|
||||
//! The [`NetworkConfig`] is used to configure the network.
|
||||
//! It requires an instance of [`BlockProvider`](reth_provider::BlockProvider).
|
||||
@@ -48,7 +48,37 @@
|
||||
//!
|
||||
//! # }
|
||||
//! ```
|
||||
//!
|
||||
//! ### Configure all components of the Network with the [`NetworkBuilder`]
|
||||
//!
|
||||
//! ```
|
||||
//! use reth_provider::test_utils::TestApi;
|
||||
//! use reth_transaction_pool::TransactionPool;
|
||||
//! use std::sync::Arc;
|
||||
//! use reth_discv4::bootnodes::mainnet_nodes;
|
||||
//! use reth_network::config::rng_secret_key;
|
||||
//! use reth_network::{NetworkConfig, NetworkManager};
|
||||
//! async fn launch<Pool: TransactionPool>(pool: Pool) {
|
||||
//! // This block provider implementation is used for testing purposes.
|
||||
//! let client = Arc::new(TestApi::default());
|
||||
//!
|
||||
//! // The key that's used for encrypting sessions and to identify our node.
|
||||
//! let local_key = rng_secret_key();
|
||||
//!
|
||||
//! let config =
|
||||
//! NetworkConfig::builder(Arc::clone(&client), local_key).boot_nodes(mainnet_nodes()).build();
|
||||
//!
|
||||
//! // create the network instance
|
||||
//! let (handle, network, transactions, request_handler) = NetworkManager::builder(config)
|
||||
//! .await
|
||||
//! .unwrap()
|
||||
//! .transactions(pool)
|
||||
//! .request_handler(client)
|
||||
//! .split_with_handle();
|
||||
//! }
|
||||
//! ```
|
||||
|
||||
mod builder;
|
||||
mod cache;
|
||||
pub mod config;
|
||||
mod discovery;
|
||||
@@ -66,6 +96,7 @@ mod state;
|
||||
mod swarm;
|
||||
pub mod transactions;
|
||||
|
||||
pub use builder::NetworkBuilder;
|
||||
pub use config::NetworkConfig;
|
||||
pub use fetch::FetchClient;
|
||||
pub use manager::{NetworkEvent, NetworkManager};
|
||||
|
||||
@@ -29,7 +29,7 @@ use crate::{
|
||||
state::NetworkState,
|
||||
swarm::{Swarm, SwarmEvent},
|
||||
transactions::NetworkTransactionEvent,
|
||||
FetchClient,
|
||||
FetchClient, NetworkBuilder,
|
||||
};
|
||||
use futures::{Future, StreamExt};
|
||||
use parking_lot::Mutex;
|
||||
@@ -102,6 +102,26 @@ pub struct NetworkManager<C> {
|
||||
}
|
||||
|
||||
// === impl NetworkManager ===
|
||||
impl<C> NetworkManager<C> {
|
||||
/// Sets the dedicated channel for events indented for the
|
||||
/// [`TransactionsManager`](crate::transactions::TransactionsManager).
|
||||
pub fn set_transactions(&mut self, tx: mpsc::UnboundedSender<NetworkTransactionEvent>) {
|
||||
self.to_transactions_manager = Some(tx);
|
||||
}
|
||||
|
||||
/// Sets the dedicated channel for events indented for the
|
||||
/// [`EthRequestHandler`](crate::eth_requests::EthRequestHandler).
|
||||
pub fn set_eth_request_handler(&mut self, tx: mpsc::UnboundedSender<IncomingEthRequest>) {
|
||||
self.to_eth_request_handler = Some(tx);
|
||||
}
|
||||
|
||||
/// Returns the [`NetworkHandle`] that can be cloned and shared.
|
||||
///
|
||||
/// The [`NetworkHandle`] can be used to interact with this [`NetworkManager`]
|
||||
pub fn handle(&self) -> &NetworkHandle {
|
||||
&self.handle
|
||||
}
|
||||
}
|
||||
|
||||
impl<C> NetworkManager<C>
|
||||
where
|
||||
@@ -169,16 +189,45 @@ where
|
||||
})
|
||||
}
|
||||
|
||||
/// Sets the dedicated channel for events indented for the
|
||||
/// [`TransactionsManager`](crate::transactions::TransactionsManager).
|
||||
pub fn set_transactions(&mut self, tx: mpsc::UnboundedSender<NetworkTransactionEvent>) {
|
||||
self.to_transactions_manager = Some(tx);
|
||||
/// Create a new [`NetworkManager`] instance and start a [`NetworkBuilder`] to configure all
|
||||
/// components of the network
|
||||
///
|
||||
/// ```
|
||||
/// use reth_provider::test_utils::TestApi;
|
||||
/// use reth_transaction_pool::TransactionPool;
|
||||
/// use std::sync::Arc;
|
||||
/// use reth_discv4::bootnodes::mainnet_nodes;
|
||||
/// use reth_network::config::rng_secret_key;
|
||||
/// use reth_network::{NetworkConfig, NetworkManager};
|
||||
/// async fn launch<Pool: TransactionPool>(pool: Pool) {
|
||||
/// // This block provider implementation is used for testing purposes.
|
||||
/// let client = Arc::new(TestApi::default());
|
||||
///
|
||||
/// // The key that's used for encrypting sessions and to identify our node.
|
||||
/// let local_key = rng_secret_key();
|
||||
///
|
||||
/// let config =
|
||||
/// NetworkConfig::builder(Arc::clone(&client), local_key).boot_nodes(mainnet_nodes()).build();
|
||||
///
|
||||
/// // create the network instance
|
||||
/// let (handle, network, transactions, request_handler) = NetworkManager::builder(config)
|
||||
/// .await
|
||||
/// .unwrap()
|
||||
/// .transactions(pool)
|
||||
/// .request_handler(client)
|
||||
/// .split_with_handle();
|
||||
/// }
|
||||
/// ```
|
||||
pub async fn builder(
|
||||
config: NetworkConfig<C>,
|
||||
) -> Result<NetworkBuilder<C, (), ()>, NetworkError> {
|
||||
let network = Self::new(config).await?;
|
||||
Ok(network.into_builder())
|
||||
}
|
||||
|
||||
/// Sets the dedicated channel for events indented for the
|
||||
/// [`EthRequestHandler`](crate::eth_requests::EthRequestHandler).
|
||||
pub fn set_eth_request_handler(&mut self, tx: mpsc::UnboundedSender<IncomingEthRequest>) {
|
||||
self.to_eth_request_handler = Some(tx);
|
||||
/// Create a [`NetworkBuilder`] to configure all components of the network
|
||||
pub fn into_builder(self) -> NetworkBuilder<C, (), ()> {
|
||||
NetworkBuilder { network: self, transactions: (), request_handler: () }
|
||||
}
|
||||
|
||||
/// Returns the [`SocketAddr`] that listens for incoming connections.
|
||||
@@ -201,13 +250,6 @@ where
|
||||
self.handle.peer_id()
|
||||
}
|
||||
|
||||
/// Returns the [`NetworkHandle`] that can be cloned and shared.
|
||||
///
|
||||
/// The [`NetworkHandle`] can be used to interact with this [`NetworkManager`]
|
||||
pub fn handle(&self) -> &NetworkHandle {
|
||||
&self.handle
|
||||
}
|
||||
|
||||
/// Returns a new [`PeersHandle`] that can be cloned and shared.
|
||||
///
|
||||
/// The [`PeersHandle`] can be used to interact with the network's peer set.
|
||||
|
||||
@@ -21,6 +21,7 @@ use tokio_stream::wrappers::UnboundedReceiverStream;
|
||||
use tracing::trace;
|
||||
|
||||
/// A communication channel to the [`PeersManager`] to apply manual changes to the peer set.
|
||||
#[derive(Clone)]
|
||||
pub struct PeersHandle {
|
||||
/// Sender half of command channel back to the [`PeersManager`]
|
||||
manager_tx: mpsc::UnboundedSender<PeerCommand>,
|
||||
|
||||
@@ -103,13 +103,7 @@ pub struct TransactionsManager<Pool> {
|
||||
transaction_events: UnboundedReceiverStream<NetworkTransactionEvent>,
|
||||
}
|
||||
|
||||
// === impl TransactionsManager ===
|
||||
|
||||
impl<Pool> TransactionsManager<Pool>
|
||||
where
|
||||
Pool: TransactionPool + 'static,
|
||||
<Pool as TransactionPool>::Transaction: IntoRecoveredTransaction,
|
||||
{
|
||||
impl<Pool: TransactionPool> TransactionsManager<Pool> {
|
||||
/// Sets up a new instance.
|
||||
///
|
||||
/// Note: This expects an existing [`NetworkManager`](crate::NetworkManager) instance.
|
||||
@@ -138,7 +132,15 @@ where
|
||||
transaction_events: UnboundedReceiverStream::new(from_network),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// === impl TransactionsManager ===
|
||||
|
||||
impl<Pool> TransactionsManager<Pool>
|
||||
where
|
||||
Pool: TransactionPool + 'static,
|
||||
<Pool as TransactionPool>::Transaction: IntoRecoveredTransaction,
|
||||
{
|
||||
/// Returns a new handle that can send commands to this type.
|
||||
pub fn handle(&self) -> TransactionsHandle {
|
||||
TransactionsHandle { manager_tx: self.command_tx.clone() }
|
||||
|
||||
Reference in New Issue
Block a user