docs(eth-wire): update docs to reflect eth-wire-types, alloy_rlp, version-aware decoding, and RLPx multiplexing (#19319)

This commit is contained in:
radik878
2025-10-27 13:58:55 +02:00
committed by GitHub
parent 106ffefc0f
commit f088ec09cb

View File

@@ -9,48 +9,70 @@ This crate can be thought of as having 2 components:
2. Abstractions over Tokio Streams that operate on these types.
(Note that ECIES is implemented in a separate `reth-ecies` crate.)
Additionally, this crate focuses on stream implementations (P2P and Eth), handshakes, and multiplexing. The protocol
message types and RLP encoding/decoding live in the separate `eth-wire-types` crate and are re-exported by `eth-wire`
for convenience.
## Types
The most basic Eth-wire type is a `ProtocolMessage`. It describes all messages that reth can send/receive.
[File: crates/net/eth-wire/src/types/message.rs](https://github.com/paradigmxyz/reth/blob/1563506aea09049a85e5cc72c2894f3f7a371581/crates/net/eth-wire/src/types/message.rs)
[File: crates/net/eth-wire-types/src/message.rs](../../crates/net/eth-wire-types/src/message.rs)
```rust, ignore
/// An `eth` protocol message, containing a message ID and payload.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct ProtocolMessage {
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ProtocolMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
pub message_type: EthMessageID,
pub message: EthMessage,
pub message: EthMessage<N>,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum EthMessage {
Status(Status),
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum EthMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
Status(StatusMessage),
NewBlockHashes(NewBlockHashes),
Transactions(Transactions),
NewPooledTransactionHashes(NewPooledTransactionHashes),
NewBlock(Box<N::NewBlockPayload>),
Transactions(Transactions<N::BroadcastedTransaction>),
NewPooledTransactionHashes66(NewPooledTransactionHashes66),
NewPooledTransactionHashes68(NewPooledTransactionHashes68),
GetBlockHeaders(RequestPair<GetBlockHeaders>),
// ...
BlockHeaders(RequestPair<BlockHeaders<N::BlockHeader>>),
GetBlockBodies(RequestPair<GetBlockBodies>),
BlockBodies(RequestPair<BlockBodies<N::BlockBody>>),
GetPooledTransactions(RequestPair<GetPooledTransactions>),
PooledTransactions(RequestPair<PooledTransactions<N::PooledTransaction>>),
GetNodeData(RequestPair<GetNodeData>),
NodeData(RequestPair<NodeData>),
GetReceipts(RequestPair<GetReceipts>),
Receipts(RequestPair<Receipts>),
Receipts(RequestPair<Receipts<N::Receipt>>),
Receipts69(RequestPair<Receipts69<N::Receipt>>),
BlockRangeUpdate(BlockRangeUpdate),
}
/// Represents message IDs for eth protocol messages.
#[repr(u8)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum EthMessageID {
Status = 0x00,
NewBlockHashes = 0x01,
Transactions = 0x02,
// ...
GetBlockHeaders = 0x03,
BlockHeaders = 0x04,
GetBlockBodies = 0x05,
BlockBodies = 0x06,
NewBlock = 0x07,
NewPooledTransactionHashes = 0x08,
GetPooledTransactions = 0x09,
PooledTransactions = 0x0a,
GetNodeData = 0x0d,
NodeData = 0x0e,
GetReceipts = 0x0f,
Receipts = 0x10,
BlockRangeUpdate = 0x11,
}
```
Messages can either be broadcast to the network, or can be a request/response message to a single peer. This 2nd type of message is
described using a `RequestPair` struct, which is simply a concatenation of the underlying message with a request id.
[File: crates/net/eth-wire/src/types/message.rs](https://github.com/paradigmxyz/reth/blob/1563506aea09049a85e5cc72c2894f3f7a371581/crates/net/eth-wire/src/types/message.rs)
[File: crates/net/eth-wire-types/src/message.rs](../../crates/net/eth-wire-types/src/message.rs)
```rust, ignore
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct RequestPair<T> {
@@ -59,10 +81,8 @@ pub struct RequestPair<T> {
}
```
Every `Ethmessage` has a corresponding rust struct that implements the `Encodable` and `Decodable` traits.
These traits are defined as follows:
[Crate: crates/rlp](https://github.com/paradigmxyz/reth/tree/1563506aea09049a85e5cc72c2894f3f7a371581/crates/rlp)
Every `EthMessage` has a corresponding Rust struct that implements `alloy_rlp::Encodable` and `alloy_rlp::Decodable`
(often via derive macros like `RlpEncodable`/`RlpDecodable`). These traits are defined in `alloy_rlp`:
```rust, ignore
pub trait Decodable: Sized {
fn decode(buf: &mut &[u8]) -> alloy_rlp::Result<Self>;
@@ -72,10 +92,11 @@ pub trait Encodable {
fn length(&self) -> usize;
}
```
These traits describe how the `Ethmessage` should be serialized/deserialized into raw bytes using the RLP format.
In reth all [RLP](https://ethereum.org/en/developers/docs/data-structures-and-encoding/rlp/) encode/decode operations are handled by the `common/rlp` and `common/rlp-derive` crates.
These traits describe how the `EthMessage` should be serialized/deserialized into raw bytes using the RLP format.
In reth all [RLP](https://ethereum.org/en/developers/docs/data-structures-and-encoding/rlp/) encode/decode operations are handled by `alloy_rlp` and the derive macros used in `eth-wire-types`.
Note that the `ProtocolMessage` itself implements these traits, so any stream of bytes can be converted into it by calling `ProtocolMessage::decode()` and vice versa with `ProtocolMessage::encode()`. The message type is determined by the first byte of the byte stream.
Note: `ProtocolMessage` implements `Encodable`, while decoding is performed via
`ProtocolMessage::decode_message(version, &mut bytes)` because decoding must respect the negotiated `EthVersion`.
### Example: The Transactions message
Let's understand how an `EthMessage` is implemented by taking a look at the `Transactions` Message. The eth specification describes a Transaction message as a list of RLP-encoded transactions:
@@ -93,17 +114,17 @@ The items in the list are transactions in the format described in the main Ether
In reth, this is represented as:
[File: crates/net/eth-wire/src/types/broadcast.rs](https://github.com/paradigmxyz/reth/blob/1563506aea09049a85e5cc72c2894f3f7a371581/crates/net/eth-wire/src/types/broadcast.rs)
[File: crates/net/eth-wire-types/src/broadcast.rs](../../crates/net/eth-wire-types/src/broadcast.rs)
```rust,ignore
pub struct Transactions(
pub struct Transactions<T = TransactionSigned>(
/// New transactions for the peer to include in its mempool.
pub Vec<TransactionSigned>,
pub Vec<T>,
);
```
And the corresponding trait implementations are present in the primitives crate.
And the corresponding transaction type is defined here:
[File: crates/primitives/src/transaction/mod.rs](https://github.com/paradigmxyz/reth/blob/1563506aea09049a85e5cc72c2894f3f7a371581/crates/primitives/src/transaction/mod.rs)
[File: crates/ethereum/primitives/src/transaction.rs](../../crates/ethereum/primitives/src/transaction.rs)
```rust, ignore
#[reth_codec]
#[derive(Debug, Clone, PartialEq, Eq, Hash, AsRef, Deref, Default, Serialize, Deserialize)]
@@ -146,7 +167,7 @@ The lowest level stream to communicate with other peers is the P2P stream. It ta
Decompression/Compression of bytes is done with snappy algorithm ([EIP 706](https://eips.ethereum.org/EIPS/eip-706))
using the external `snap` crate.
[File: crates/net/eth-wire/src/p2pstream.rs](https://github.com/paradigmxyz/reth/blob/1563506aea09049a85e5cc72c2894f3f7a371581/crates/net/eth-wire/src/p2pstream.rs)
[File: crates/net/eth-wire/src/p2pstream.rs](../../crates/net/eth-wire/src/p2pstream.rs)
```rust,ignore
#[pin_project]
pub struct P2PStream<S> {
@@ -155,23 +176,29 @@ pub struct P2PStream<S> {
encoder: snap::raw::Encoder,
decoder: snap::raw::Decoder,
pinger: Pinger,
shared_capability: SharedCapability,
/// Negotiated shared capabilities
shared_capabilities: SharedCapabilities,
/// Outgoing messages buffered for sending to the underlying stream.
outgoing_messages: VecDeque<Bytes>,
/// Maximum number of messages that can be buffered before yielding backpressure.
outgoing_message_buffer_capacity: usize,
/// Whether this stream is currently in the process of gracefully disconnecting.
disconnecting: bool,
}
```
### Pinger
To manage pinging, an instance of the `Pinger` struct is used. This is a state machine that keeps track of how many pings
we have sent/received and the timeouts associated with them.
To manage pinging, an instance of the `Pinger` struct is used. This is a state machine that keeps track of pings
we have sent/received and the timeout associated with them.
[File: crates/net/eth-wire/src/pinger.rs](https://github.com/paradigmxyz/reth/blob/1563506aea09049a85e5cc72c2894f3f7a371581/crates/net/eth-wire/src/pinger.rs)
[File: crates/net/eth-wire/src/pinger.rs](../../crates/net/eth-wire/src/pinger.rs)
```rust,ignore
#[derive(Debug)]
pub(crate) struct Pinger {
/// The timer used for the next ping.
ping_interval: Interval,
/// The timer used for the next ping.
/// The timer used to detect a ping timeout.
timeout_timer: Pin<Box<Sleep>>,
/// The timeout duration for each ping.
timeout: Duration,
state: PingState,
}
@@ -205,7 +232,7 @@ pub(crate) fn poll_ping(
}
}
PingState::WaitingForPong => {
if self.timeout_timer.is_elapsed() {
if self.timeout_timer.as_mut().poll(cx).is_ready() {
self.state = PingState::TimedOut;
return Poll::Ready(Ok(PingerEvent::Timeout))
}
@@ -223,7 +250,7 @@ To send and receive data, the P2PStream itself is a future that implements the `
For the `Stream` trait, the `inner` stream is polled, decompressed and returned. Most of the code is just
error handling and is omitted here for clarity.
[File: crates/net/eth-wire/src/p2pstream.rs](https://github.com/paradigmxyz/reth/blob/1563506aea09049a85e5cc72c2894f3f7a371581/crates/net/eth-wire/src/p2pstream.rs)
[File: crates/net/eth-wire/src/p2pstream.rs](../../crates/net/eth-wire/src/p2pstream.rs)
```rust,ignore
impl<S> Stream for P2PStream<S> {
@@ -240,7 +267,8 @@ impl<S> Stream for P2PStream<S> {
let mut decompress_buf = BytesMut::zeroed(decompressed_len + 1);
this.decoder.decompress(&bytes[1..], &mut decompress_buf[1..])?;
// ... Omitted Error handling
decompress_buf[0] = bytes[0] - this.shared_capability.offset();
// Normalize IDs: reserved p2p range is 0x00..=0x0f; subprotocols start at 0x10
decompress_buf[0] = bytes[0] - MAX_RESERVED_MESSAGE_ID - 1;
return Poll::Ready(Some(Ok(decompress_buf)))
}
}
@@ -250,7 +278,7 @@ impl<S> Stream for P2PStream<S> {
Similarly, for the `Sink` trait, we do the reverse, compressing and sending data out to the `inner` stream.
The important functions in this trait are shown below.
[File: crates/net/eth-wire/src/p2pstream.rs](https://github.com/paradigmxyz/reth/blob/1563506aea09049a85e5cc72c2894f3f7a371581/crates/net/eth-wire/src/p2pstream.rs)
[File: crates/net/eth-wire/src/p2pstream.rs](../../crates/net/eth-wire/src/p2pstream.rs)
```rust, ignore
impl<S> Sink<Bytes> for P2PStream<S> {
fn start_send(self: Pin<&mut Self>, item: Bytes) -> Result<(), Self::Error> {
@@ -258,7 +286,8 @@ impl<S> Sink<Bytes> for P2PStream<S> {
let mut compressed = BytesMut::zeroed(1 + snap::raw::max_compress_len(item.len() - 1));
let compressed_size = this.encoder.compress(&item[1..], &mut compressed[1..])?;
compressed.truncate(compressed_size + 1);
compressed[0] = item[0] + this.shared_capability.offset();
// Mask subprotocol IDs into global space above reserved p2p IDs
compressed[0] = item[0] + MAX_RESERVED_MESSAGE_ID + 1;
this.outgoing_messages.push_back(compressed.freeze());
Ok(())
}
@@ -285,9 +314,9 @@ impl<S> Sink<Bytes> for P2PStream<S> {
## EthStream
The EthStream is very simple, it does not keep track of any state, it simply wraps the P2Pstream.
The EthStream wraps a stream and handles eth message (RLP) encoding/decoding with respect to the negotiated `EthVersion`.
[File: crates/net/eth-wire/src/ethstream.rs](https://github.com/paradigmxyz/reth/blob/1563506aea09049a85e5cc72c2894f3f7a371581/crates/net/eth-wire/src/ethstream.rs)
[File: crates/net/eth-wire/src/ethstream.rs](../../crates/net/eth-wire/src/ethstream.rs)
```rust,ignore
#[pin_project]
pub struct EthStream<S> {
@@ -295,10 +324,10 @@ pub struct EthStream<S> {
inner: S,
}
```
EthStream's only job is to perform the RLP decoding/encoding, using the `ProtocolMessage::decode()` and `ProtocolMessage::encode()`
functions we looked at earlier.
EthStream performs RLP decoding/encoding using `ProtocolMessage::decode_message(version, &mut bytes)`
and `ProtocolMessage::encode()`, and enforces protocol rules (e.g., prohibiting `Status` after handshake).
[File: crates/net/eth-wire/src/ethstream.rs](https://github.com/paradigmxyz/reth/blob/1563506aea09049a85e5cc72c2894f3f7a371581/crates/net/eth-wire/src/ethstream.rs)
[File: crates/net/eth-wire/src/ethstream.rs](../../crates/net/eth-wire/src/ethstream.rs)
```rust,ignore
impl<S, E> Stream for EthStream<S> {
// ...
@@ -306,7 +335,7 @@ impl<S, E> Stream for EthStream<S> {
let this = self.project();
let bytes = ready!(this.inner.poll_next(cx)).unwrap();
// ...
let msg = match ProtocolMessage::decode(&mut bytes.as_ref()) {
let msg = match ProtocolMessage::decode_message(self.version(), &mut bytes.as_ref()) {
Ok(m) => m,
Err(err) => {
return Poll::Ready(Some(Err(err.into())))
@@ -319,10 +348,12 @@ impl<S, E> Stream for EthStream<S> {
impl<S, E> Sink<EthMessage> for EthStream<S> {
// ...
fn start_send(self: Pin<&mut Self>, item: EthMessage) -> Result<(), Self::Error> {
// ...
if matches!(item, EthMessage::Status(_)) {
let _ = self.project().inner.disconnect(DisconnectReason::ProtocolBreach);
return Err(EthStreamError::EthHandshakeError(EthHandshakeError::StatusNotInHandshake))
}
let mut bytes = BytesMut::new();
ProtocolMessage::from(item).encode(&mut bytes);
let bytes = bytes.freeze();
self.project().inner.start_send(bytes)?;
Ok(())
@@ -339,9 +370,9 @@ For a session to be established, peers in the Ethereum network must first exchan
To perform these, reth has special `Unauthed` versions of streams described above.
The `UnauthedP2Pstream` does the `Hello` handshake and returns a `P2PStream`.
The `UnauthedP2PStream` does the `Hello` handshake and returns a `P2PStream`.
[File: crates/net/eth-wire/src/p2pstream.rs](https://github.com/paradigmxyz/reth/blob/1563506aea09049a85e5cc72c2894f3f7a371581/crates/net/eth-wire/src/p2pstream.rs)
[File: crates/net/eth-wire/src/p2pstream.rs](../../crates/net/eth-wire/src/p2pstream.rs)
```rust, ignore
#[pin_project]
pub struct UnauthedP2PStream<S> {
@@ -351,8 +382,8 @@ pub struct UnauthedP2PStream<S> {
impl<S> UnauthedP2PStream<S> {
// ...
pub async fn handshake(mut self, hello: HelloMessage) -> Result<(P2PStream<S>, HelloMessage), Error> {
self.inner.send(alloy_rlp::encode(P2PMessage::Hello(hello.clone())).into()).await?;
pub async fn handshake(mut self, hello: HelloMessageWithProtocols) -> Result<(P2PStream<S>, HelloMessage), Error> {
self.inner.send(alloy_rlp::encode(P2PMessage::Hello(hello.message())).into()).await?;
let first_message_bytes = tokio::time::timeout(HANDSHAKE_TIMEOUT, self.inner.next()).await;
let their_hello = match P2PMessage::decode(&mut &first_message_bytes[..]) {
@@ -360,11 +391,25 @@ impl<S> UnauthedP2PStream<S> {
// ...
}
}?;
let stream = P2PStream::new(self.inner, capability);
let stream = P2PStream::new(self.inner, shared_capabilities);
Ok((stream, their_hello))
}
}
```
Similarly, UnauthedEthStream does the `Status` handshake and returns an `EthStream`. The code is [here](https://github.com/paradigmxyz/reth/blob/1563506aea09049a85e5cc72c2894f3f7a371581/crates/net/eth-wire/src/ethstream.rs)
Similarly, `UnauthedEthStream` does the `Status` handshake and returns an `EthStream`. It accepts a `UnifiedStatus`
and a `ForkFilter`, and provides a timeout wrapper. The code is [here](../../crates/net/eth-wire/src/ethstream.rs)
### Multiplexing and satellites
`eth-wire` also provides `RlpxProtocolMultiplexer`/`RlpxSatelliteStream` to run the primary `eth` protocol alongside
additional "satellite" protocols (e.g. `snap`) using negotiated `SharedCapabilities`.
## Message variants and versions
- `NewPooledTransactionHashes` differs between ETH66 (`NewPooledTransactionHashes66`) and ETH68 (`NewPooledTransactionHashes68`).
- Starting with ETH67, `GetNodeData` and `NodeData` are removed (decoding them for >=67 yields an error).
- Starting with ETH69:
- `BlockRangeUpdate (0x11)` announces the historical block range served.
- Receipts omit bloom: encoded as `Receipts69` instead of `Receipts`.