feat: rlpx multiplexing (#5413)

This commit is contained in:
Matthias Seitz
2023-11-22 21:30:18 +01:00
committed by GitHub
parent 5e2affb15a
commit 3598a23cf7
5 changed files with 548 additions and 61 deletions

View File

@@ -20,6 +20,7 @@ mod disconnect;
pub mod errors;
mod ethstream;
mod hello;
pub mod multiplex;
mod p2pstream;
mod pinger;
pub mod protocol;
@@ -27,6 +28,9 @@ pub use builder::*;
pub mod types;
pub use types::*;
#[cfg(test)]
pub mod test_utils;
#[cfg(test)]
pub use tokio_util::codec::{
LengthDelimitedCodec as PassthroughCodec, LengthDelimitedCodecError as PassthroughCodecError,

View File

@@ -0,0 +1,459 @@
//! Rlpx protocol multiplexer and satellite stream
//!
//! A Satellite is a Stream that primarily drives a single RLPx subprotocol but can also handle
//! additional subprotocols.
//!
//! Most of other subprotocols are "dependent satellite" protocols of "eth" and not a fully standalone protocol, for example "snap", See also [snap protocol](https://github.com/ethereum/devp2p/blob/298d7a77c3bf833641579ecbbb5b13f0311eeeea/caps/snap.md?plain=1#L71)
//! Hence it is expected that the primary protocol is "eth" and the additional protocols are
//! "dependent satellite" protocols.
use std::{
collections::VecDeque,
fmt,
future::Future,
io,
pin::Pin,
task::{ready, Context, Poll},
};
use bytes::{Bytes, BytesMut};
use futures::{pin_mut, Sink, SinkExt, Stream, StreamExt, TryStream, TryStreamExt};
use tokio::sync::{mpsc, mpsc::UnboundedSender};
use tokio_stream::wrappers::UnboundedReceiverStream;
use crate::{
capability::{Capability, SharedCapabilities, SharedCapability, UnsupportedCapabilityError},
errors::P2PStreamError,
CanDisconnect, DisconnectReason, P2PStream,
};
/// A Stream and Sink type that wraps a raw rlpx stream [P2PStream] and handles message ID
/// multiplexing.
#[derive(Debug)]
pub struct RlpxProtocolMultiplexer<St> {
/// The raw p2p stream
conn: P2PStream<St>,
/// All the subprotocols that are multiplexed on top of the raw p2p stream
protocols: Vec<ProtocolStream>,
}
impl<St> RlpxProtocolMultiplexer<St> {
/// Wraps the raw p2p stream
pub fn new(conn: P2PStream<St>) -> Self {
Self { conn, protocols: Default::default() }
}
/// Installs a new protocol on top of the raw p2p stream
pub fn install_protocol<S>(
&mut self,
_cap: Capability,
_st: S,
) -> Result<(), UnsupportedCapabilityError> {
todo!()
}
/// Returns the [SharedCapabilities] of the underlying raw p2p stream
pub fn shared_capabilities(&self) -> &SharedCapabilities {
self.conn.shared_capabilities()
}
/// Converts this multiplexer into a [RlpxSatelliteStream] with the given primary protocol.
///
/// Returns an error if the primary protocol is not supported by the remote or the handshake
/// failed.
pub async fn into_satellite_stream_with_handshake<F, Fut, Err, Primary>(
mut self,
cap: &Capability,
handshake: F,
) -> Result<RlpxSatelliteStream<St, Primary>, Self>
where
F: FnOnce(ProtocolProxy) -> Fut,
Fut: Future<Output = Result<Primary, Err>>,
St: Stream<Item = io::Result<BytesMut>> + Sink<Bytes, Error = io::Error> + Unpin,
{
let Ok(shared_cap) = self.shared_capabilities().ensure_matching_capability(cap).cloned()
else {
return Err(self)
};
let (to_primary, from_wire) = mpsc::unbounded_channel();
let (to_wire, mut from_primary) = mpsc::unbounded_channel();
let proxy = ProtocolProxy {
cap: shared_cap.clone(),
from_wire: UnboundedReceiverStream::new(from_wire),
to_wire,
};
let f = handshake(proxy);
pin_mut!(f);
// handle messages until the handshake is complete
loop {
// TODO error handling
tokio::select! {
Some(Ok(msg)) = self.conn.next() => {
// TODO handle multiplex
let _ = to_primary.send(msg);
}
Some(msg) = from_primary.recv() => {
// TODO error handling
self.conn.send(msg).await.unwrap();
}
res = &mut f => {
let Ok(primary) = res else { return Err(self) };
return Ok(RlpxSatelliteStream {
conn: self.conn,
to_primary,
from_primary: UnboundedReceiverStream::new(from_primary),
primary,
primary_capability: shared_cap,
satellites: self.protocols,
out_buffer: Default::default(),
})
}
}
}
}
}
/// A Stream and Sink type that acts as a wrapper around a primary RLPx subprotocol (e.g. "eth")
#[derive(Debug)]
pub struct ProtocolProxy {
cap: SharedCapability,
from_wire: UnboundedReceiverStream<BytesMut>,
to_wire: UnboundedSender<Bytes>,
}
impl ProtocolProxy {
fn mask_msg_id(&self, msg: Bytes) -> Bytes {
// TODO handle empty messages
let mut masked_bytes = BytesMut::zeroed(msg.len());
masked_bytes[0] = msg[0] + self.cap.relative_message_id_offset();
masked_bytes[1..].copy_from_slice(&msg[1..]);
masked_bytes.freeze()
}
fn unmask_id(&self, mut msg: BytesMut) -> BytesMut {
// TODO handle empty messages
msg[0] -= self.cap.relative_message_id_offset();
msg
}
}
impl Stream for ProtocolProxy {
type Item = Result<BytesMut, io::Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let msg = ready!(self.from_wire.poll_next_unpin(cx));
Poll::Ready(msg.map(|msg| Ok(self.get_mut().unmask_id(msg))))
}
}
impl Sink<Bytes> for ProtocolProxy {
type Error = io::Error;
fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn start_send(self: Pin<&mut Self>, item: Bytes) -> Result<(), Self::Error> {
let msg = self.mask_msg_id(item);
self.to_wire.send(msg).map_err(|_| io::ErrorKind::BrokenPipe.into())
}
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
}
#[async_trait::async_trait]
impl CanDisconnect<Bytes> for ProtocolProxy {
async fn disconnect(
&mut self,
_reason: DisconnectReason,
) -> Result<(), <Self as Sink<Bytes>>::Error> {
// TODO handle disconnects
Ok(())
}
}
/// A connection channel to receive messages for the negotiated protocol.
///
/// This is a [Stream] that returns raw bytes of the received messages for this protocol.
#[derive(Debug)]
pub struct ProtocolConnection {
from_wire: UnboundedReceiverStream<BytesMut>,
}
impl Stream for ProtocolConnection {
type Item = BytesMut;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.from_wire.poll_next_unpin(cx)
}
}
/// A Stream and Sink type that acts as a wrapper around a primary RLPx subprotocol (e.g. "eth")
/// [EthStream](crate::EthStream) and can also handle additional subprotocols.
#[derive(Debug)]
pub struct RlpxSatelliteStream<St, Primary> {
/// The raw p2p stream
conn: P2PStream<St>,
to_primary: UnboundedSender<BytesMut>,
from_primary: UnboundedReceiverStream<Bytes>,
primary: Primary,
primary_capability: SharedCapability,
satellites: Vec<ProtocolStream>,
out_buffer: VecDeque<Bytes>,
}
impl<St, Primary> RlpxSatelliteStream<St, Primary> {}
impl<St, Primary, PrimaryErr> Stream for RlpxSatelliteStream<St, Primary>
where
St: Stream<Item = io::Result<BytesMut>> + Sink<Bytes, Error = io::Error> + Unpin,
Primary: TryStream<Error = PrimaryErr> + Unpin,
P2PStreamError: Into<PrimaryErr>,
{
type Item = Result<Primary::Ok, Primary::Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
loop {
// first drain the primary stream
if let Poll::Ready(Some(msg)) = this.primary.try_poll_next_unpin(cx) {
return Poll::Ready(Some(msg))
}
let mut out_ready = true;
loop {
match this.conn.poll_ready_unpin(cx) {
Poll::Ready(_) => {
if let Some(msg) = this.out_buffer.pop_front() {
if let Err(err) = this.conn.start_send_unpin(msg) {
return Poll::Ready(Some(Err(err.into())))
}
} else {
break;
}
}
Poll::Pending => {
out_ready = false;
break
}
}
}
// advance primary out
loop {
match this.from_primary.poll_next_unpin(cx) {
Poll::Ready(Some(msg)) => {
this.out_buffer.push_back(msg);
}
Poll::Ready(None) => {
// primary closed
return Poll::Ready(None)
}
Poll::Pending => break,
}
}
// advance all satellites
for idx in (0..this.satellites.len()).rev() {
let mut proto = this.satellites.swap_remove(idx);
loop {
match proto.poll_next_unpin(cx) {
Poll::Ready(Some(msg)) => {
this.out_buffer.push_back(msg);
}
Poll::Ready(None) => return Poll::Ready(None),
Poll::Pending => {
this.satellites.push(proto);
break
}
}
}
}
let mut delegated = false;
loop {
// pull messages from connection
match this.conn.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(msg))) => {
delegated = true;
let offset = msg[0];
// find the protocol that matches the offset
// TODO optimize this by keeping a better index
let mut lowest_satellite = None;
// find the protocol with the lowest offset that is greater than the message
// offset
for (i, proto) in this.satellites.iter().enumerate() {
let proto_offset = proto.cap.relative_message_id_offset();
if proto_offset >= offset {
if let Some((_, lowest_offset)) = lowest_satellite {
if proto_offset < lowest_offset {
lowest_satellite = Some((i, proto_offset));
}
} else {
lowest_satellite = Some((i, proto_offset));
}
}
}
if let Some((idx, lowest_offset)) = lowest_satellite {
if lowest_offset < this.primary_capability.relative_message_id_offset()
{
// delegate to satellite
this.satellites[idx].send_raw(msg);
continue
}
}
// delegate to primary
let _ = this.to_primary.send(msg);
}
Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(err.into()))),
Poll::Ready(None) => {
// connection closed
return Poll::Ready(None)
}
Poll::Pending => break,
}
}
if !delegated || !out_ready || this.out_buffer.is_empty() {
return Poll::Pending
}
}
}
}
impl<St, Primary, T> Sink<T> for RlpxSatelliteStream<St, Primary>
where
St: Stream<Item = io::Result<BytesMut>> + Sink<Bytes, Error = io::Error> + Unpin,
Primary: Sink<T, Error = io::Error> + Unpin,
P2PStreamError: Into<<Primary as Sink<T>>::Error>,
{
type Error = <Primary as Sink<T>>::Error;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let this = self.get_mut();
if let Err(err) = ready!(this.conn.poll_ready_unpin(cx)) {
return Poll::Ready(Err(err.into()))
}
if let Err(err) = ready!(this.primary.poll_ready_unpin(cx)) {
return Poll::Ready(Err(err))
}
Poll::Ready(Ok(()))
}
fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
self.get_mut().primary.start_send_unpin(item)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.get_mut().conn.poll_flush_unpin(cx).map_err(Into::into)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.get_mut().conn.poll_close_unpin(cx).map_err(Into::into)
}
}
/// Wraps a RLPx subprotocol and handles message ID multiplexing.
struct ProtocolStream {
cap: SharedCapability,
/// the channel shared with the satellite stream
to_satellite: UnboundedSender<BytesMut>,
satellite_st: Pin<Box<dyn Stream<Item = BytesMut>>>,
}
impl ProtocolStream {
fn mask_msg_id(&self, mut msg: BytesMut) -> Bytes {
// TODO handle empty messages
msg[0] += self.cap.relative_message_id_offset();
msg.freeze()
}
fn unmask_id(&self, mut msg: BytesMut) -> BytesMut {
// TODO handle empty messages
msg[0] -= self.cap.relative_message_id_offset();
msg
}
fn send_raw(&self, msg: BytesMut) {
let _ = self.to_satellite.send(self.unmask_id(msg));
}
}
impl Stream for ProtocolStream {
type Item = Bytes;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
let msg = ready!(this.satellite_st.as_mut().poll_next(cx));
Poll::Ready(msg.map(|msg| this.mask_msg_id(msg)))
}
}
impl fmt::Debug for ProtocolStream {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ProtocolStream").field("cap", &self.cap).finish_non_exhaustive()
}
}
#[cfg(test)]
mod tests {
use tokio::net::TcpListener;
use tokio_util::codec::Decoder;
use crate::{
test_utils::{connect_passthrough, eth_handshake, eth_hello},
UnauthedEthStream, UnauthedP2PStream,
};
use super::*;
#[tokio::test]
async fn eth_satellite() {
reth_tracing::init_test_tracing();
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let local_addr = listener.local_addr().unwrap();
let (status, fork_filter) = eth_handshake();
let other_status = status;
let other_fork_filter = fork_filter.clone();
let _handle = tokio::spawn(async move {
let (incoming, _) = listener.accept().await.unwrap();
let stream = crate::PassthroughCodec::default().framed(incoming);
let (server_hello, _) = eth_hello();
let (p2p_stream, _) =
UnauthedP2PStream::new(stream).handshake(server_hello).await.unwrap();
let (_eth_stream, _) = UnauthedEthStream::new(p2p_stream)
.handshake(other_status, other_fork_filter)
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
});
let conn = connect_passthrough(local_addr, eth_hello().0).await;
let eth = conn.shared_capabilities().eth().unwrap().clone();
let multiplexer = RlpxProtocolMultiplexer::new(conn);
let _satellite = multiplexer
.into_satellite_stream_with_handshake(
eth.capability().as_ref(),
move |proxy| async move {
UnauthedEthStream::new(proxy).handshake(status, fork_filter).await
},
)
.await
.unwrap();
}
}

View File

@@ -1,19 +1,5 @@
#![allow(dead_code, unreachable_pub, missing_docs, unused_variables)]
use crate::{
disconnect::CanDisconnect,
errors::{P2PHandshakeError, P2PStreamError},
pinger::{Pinger, PingerEvent},
DisconnectReason, HelloMessage, HelloMessageWithProtocols,
};
use alloy_rlp::{Decodable, Encodable, Error as RlpError, EMPTY_LIST_CODE};
use futures::{Sink, SinkExt, StreamExt};
use pin_project::pin_project;
use reth_codecs::derive_arbitrary;
use reth_metrics::metrics::counter;
use reth_primitives::{
bytes::{Buf, BufMut, Bytes, BytesMut},
hex, GotExpected,
};
use std::{
collections::VecDeque,
fmt, io,
@@ -21,13 +7,30 @@ use std::{
task::{ready, Context, Poll},
time::Duration,
};
use tokio_stream::Stream;
use crate::capability::SharedCapabilities;
use alloy_rlp::{Decodable, Encodable, Error as RlpError, EMPTY_LIST_CODE};
use futures::{Sink, SinkExt, StreamExt};
use pin_project::pin_project;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
use tokio_stream::Stream;
use tracing::{debug, trace};
use reth_codecs::derive_arbitrary;
use reth_metrics::metrics::counter;
use reth_primitives::{
bytes::{Buf, BufMut, Bytes, BytesMut},
hex, GotExpected,
};
use crate::{
capability::SharedCapabilities,
disconnect::CanDisconnect,
errors::{P2PHandshakeError, P2PStreamError},
pinger::{Pinger, PingerEvent},
DisconnectReason, HelloMessage, HelloMessageWithProtocols,
};
/// [`MAX_PAYLOAD_SIZE`] is the maximum size of an uncompressed message payload.
/// This is defined in [EIP-706](https://eips.ethereum.org/EIPS/eip-706).
const MAX_PAYLOAD_SIZE: usize = 16 * 1024 * 1024;
@@ -785,27 +788,12 @@ impl Decodable for ProtocolVersion {
#[cfg(test)]
mod tests {
use super::*;
use crate::{capability::SharedCapability, DisconnectReason, EthVersion};
use reth_discv4::DEFAULT_DISCOVERY_PORT;
use reth_ecies::util::pk2id;
use secp256k1::{SecretKey, SECP256K1};
use crate::{
capability::SharedCapability, test_utils::eth_hello, DisconnectReason, EthVersion,
};
use tokio::net::{TcpListener, TcpStream};
use tokio_util::codec::Decoder;
/// Returns a testing `HelloMessage` and new secretkey
fn eth_hello() -> (HelloMessageWithProtocols, SecretKey) {
let server_key = SecretKey::new(&mut rand::thread_rng());
let protocols = vec![EthVersion::Eth67.into()];
let hello = HelloMessageWithProtocols {
protocol_version: ProtocolVersion::V5,
client_version: "bitcoind/1.0.0".to_string(),
protocols,
port: DEFAULT_DISCOVERY_PORT,
id: pk2id(&server_key.public_key(SECP256K1)),
};
(hello, server_key)
}
#[tokio::test]
async fn test_can_disconnect() {
reth_tracing::init_test_tracing();

View File

@@ -0,0 +1,57 @@
//! Utilities for testing p2p protocol.
use crate::{
EthVersion, HelloMessageWithProtocols, P2PStream, ProtocolVersion, Status, UnauthedP2PStream,
};
use reth_discv4::DEFAULT_DISCOVERY_PORT;
use reth_ecies::util::pk2id;
use reth_primitives::{Chain, ForkFilter, Head, B256, U256};
use secp256k1::{SecretKey, SECP256K1};
use std::net::SocketAddr;
use tokio::net::TcpStream;
use tokio_util::codec::{Decoder, Framed, LengthDelimitedCodec};
pub type P2pPassthroughTcpStream = P2PStream<Framed<TcpStream, LengthDelimitedCodec>>;
/// Returns a new testing `HelloMessage` and new secretkey
pub fn eth_hello() -> (HelloMessageWithProtocols, SecretKey) {
let server_key = SecretKey::new(&mut rand::thread_rng());
let protocols = vec![EthVersion::Eth67.into()];
let hello = HelloMessageWithProtocols {
protocol_version: ProtocolVersion::V5,
client_version: "eth/1.0.0".to_string(),
protocols,
port: DEFAULT_DISCOVERY_PORT,
id: pk2id(&server_key.public_key(SECP256K1)),
};
(hello, server_key)
}
/// Returns testing eth handshake status and fork filter.
pub fn eth_handshake() -> (Status, ForkFilter) {
let genesis = B256::random();
let fork_filter = ForkFilter::new(Head::default(), genesis, 0, Vec::new());
let status = Status {
version: EthVersion::Eth67 as u8,
chain: Chain::mainnet(),
total_difficulty: U256::ZERO,
blockhash: B256::random(),
genesis,
// Pass the current fork id.
forkid: fork_filter.current(),
};
(status, fork_filter)
}
/// Connects to a remote node and returns an authenticated `P2PStream` with the remote node.
pub async fn connect_passthrough(
addr: SocketAddr,
client_hello: HelloMessageWithProtocols,
) -> P2pPassthroughTcpStream {
let outgoing = TcpStream::connect(addr).await.unwrap();
let sink = crate::PassthroughCodec::default().framed(outgoing);
let (p2p_stream, _) = UnauthedP2PStream::new(sink).handshake(client_hello).await.unwrap();
p2p_stream
}

View File

@@ -2,19 +2,14 @@
//!
//! See also <https://github.com/ethereum/devp2p/blob/master/README.md>
use futures::{Stream, StreamExt};
use reth_eth_wire::{capability::SharedCapabilities, protocol::Protocol};
use futures::Stream;
use reth_eth_wire::{
capability::SharedCapabilities, multiplex::ProtocolConnection, protocol::Protocol,
};
use reth_network_api::Direction;
use reth_primitives::BytesMut;
use reth_rpc_types::PeerId;
use std::{
fmt,
net::SocketAddr,
pin::Pin,
task::{Context, Poll},
};
use tokio_stream::wrappers::UnboundedReceiverStream;
use std::{fmt, net::SocketAddr, pin::Pin};
/// A trait that allows to offer additional RLPx-based application-level protocols when establishing
/// a peer-to-peer connection.
@@ -81,22 +76,6 @@ pub enum OnNotSupported {
Disconnect,
}
/// A connection channel to receive messages for the negotiated protocol.
///
/// This is a [Stream] that returns raw bytes of the received messages for this protocol.
#[derive(Debug)]
pub struct ProtocolConnection {
from_wire: UnboundedReceiverStream<BytesMut>,
}
impl Stream for ProtocolConnection {
type Item = BytesMut;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.from_wire.poll_next_unpin(cx)
}
}
/// A wrapper type for a RLPx sub-protocol.
#[derive(Debug)]
pub struct RlpxSubProtocol(Box<dyn DynProtocolHandler>);