diff --git a/docs/crates/eth-wire.md b/docs/crates/eth-wire.md index cf0c2cc537..cf62ab143e 100644 --- a/docs/crates/eth-wire.md +++ b/docs/crates/eth-wire.md @@ -9,48 +9,70 @@ This crate can be thought of as having 2 components: 2. Abstractions over Tokio Streams that operate on these types. (Note that ECIES is implemented in a separate `reth-ecies` crate.) +Additionally, this crate focuses on stream implementations (P2P and Eth), handshakes, and multiplexing. The protocol +message types and RLP encoding/decoding live in the separate `eth-wire-types` crate and are re-exported by `eth-wire` +for convenience. ## Types The most basic Eth-wire type is a `ProtocolMessage`. It describes all messages that reth can send/receive. -[File: crates/net/eth-wire/src/types/message.rs](https://github.com/paradigmxyz/reth/blob/1563506aea09049a85e5cc72c2894f3f7a371581/crates/net/eth-wire/src/types/message.rs) +[File: crates/net/eth-wire-types/src/message.rs](../../crates/net/eth-wire-types/src/message.rs) ```rust, ignore /// An `eth` protocol message, containing a message ID and payload. -#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] -pub struct ProtocolMessage { +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct ProtocolMessage { pub message_type: EthMessageID, - pub message: EthMessage, + pub message: EthMessage, } -#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] -pub enum EthMessage { - Status(Status), +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum EthMessage { + Status(StatusMessage), NewBlockHashes(NewBlockHashes), - Transactions(Transactions), - NewPooledTransactionHashes(NewPooledTransactionHashes), + NewBlock(Box), + Transactions(Transactions), + NewPooledTransactionHashes66(NewPooledTransactionHashes66), + NewPooledTransactionHashes68(NewPooledTransactionHashes68), GetBlockHeaders(RequestPair), - // ... + BlockHeaders(RequestPair>), + GetBlockBodies(RequestPair), + BlockBodies(RequestPair>), + GetPooledTransactions(RequestPair), + PooledTransactions(RequestPair>), + GetNodeData(RequestPair), + NodeData(RequestPair), GetReceipts(RequestPair), - Receipts(RequestPair), + Receipts(RequestPair>), + Receipts69(RequestPair>), + BlockRangeUpdate(BlockRangeUpdate), } /// Represents message IDs for eth protocol messages. #[repr(u8)] -#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Clone, Copy, Debug, PartialEq, Eq)] pub enum EthMessageID { Status = 0x00, NewBlockHashes = 0x01, Transactions = 0x02, - // ... + GetBlockHeaders = 0x03, + BlockHeaders = 0x04, + GetBlockBodies = 0x05, + BlockBodies = 0x06, + NewBlock = 0x07, + NewPooledTransactionHashes = 0x08, + GetPooledTransactions = 0x09, + PooledTransactions = 0x0a, + GetNodeData = 0x0d, NodeData = 0x0e, GetReceipts = 0x0f, Receipts = 0x10, + BlockRangeUpdate = 0x11, } ``` Messages can either be broadcast to the network, or can be a request/response message to a single peer. This 2nd type of message is described using a `RequestPair` struct, which is simply a concatenation of the underlying message with a request id. -[File: crates/net/eth-wire/src/types/message.rs](https://github.com/paradigmxyz/reth/blob/1563506aea09049a85e5cc72c2894f3f7a371581/crates/net/eth-wire/src/types/message.rs) +[File: crates/net/eth-wire-types/src/message.rs](../../crates/net/eth-wire-types/src/message.rs) ```rust, ignore #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct RequestPair { @@ -59,10 +81,8 @@ pub struct RequestPair { } ``` -Every `Ethmessage` has a corresponding rust struct that implements the `Encodable` and `Decodable` traits. -These traits are defined as follows: - -[Crate: crates/rlp](https://github.com/paradigmxyz/reth/tree/1563506aea09049a85e5cc72c2894f3f7a371581/crates/rlp) +Every `EthMessage` has a corresponding Rust struct that implements `alloy_rlp::Encodable` and `alloy_rlp::Decodable` +(often via derive macros like `RlpEncodable`/`RlpDecodable`). These traits are defined in `alloy_rlp`: ```rust, ignore pub trait Decodable: Sized { fn decode(buf: &mut &[u8]) -> alloy_rlp::Result; @@ -72,10 +92,11 @@ pub trait Encodable { fn length(&self) -> usize; } ``` -These traits describe how the `Ethmessage` should be serialized/deserialized into raw bytes using the RLP format. -In reth all [RLP](https://ethereum.org/en/developers/docs/data-structures-and-encoding/rlp/) encode/decode operations are handled by the `common/rlp` and `common/rlp-derive` crates. +These traits describe how the `EthMessage` should be serialized/deserialized into raw bytes using the RLP format. +In reth all [RLP](https://ethereum.org/en/developers/docs/data-structures-and-encoding/rlp/) encode/decode operations are handled by `alloy_rlp` and the derive macros used in `eth-wire-types`. -Note that the `ProtocolMessage` itself implements these traits, so any stream of bytes can be converted into it by calling `ProtocolMessage::decode()` and vice versa with `ProtocolMessage::encode()`. The message type is determined by the first byte of the byte stream. +Note: `ProtocolMessage` implements `Encodable`, while decoding is performed via +`ProtocolMessage::decode_message(version, &mut bytes)` because decoding must respect the negotiated `EthVersion`. ### Example: The Transactions message Let's understand how an `EthMessage` is implemented by taking a look at the `Transactions` Message. The eth specification describes a Transaction message as a list of RLP-encoded transactions: @@ -93,17 +114,17 @@ The items in the list are transactions in the format described in the main Ether In reth, this is represented as: -[File: crates/net/eth-wire/src/types/broadcast.rs](https://github.com/paradigmxyz/reth/blob/1563506aea09049a85e5cc72c2894f3f7a371581/crates/net/eth-wire/src/types/broadcast.rs) +[File: crates/net/eth-wire-types/src/broadcast.rs](../../crates/net/eth-wire-types/src/broadcast.rs) ```rust,ignore -pub struct Transactions( +pub struct Transactions( /// New transactions for the peer to include in its mempool. - pub Vec, + pub Vec, ); ``` -And the corresponding trait implementations are present in the primitives crate. +And the corresponding transaction type is defined here: -[File: crates/primitives/src/transaction/mod.rs](https://github.com/paradigmxyz/reth/blob/1563506aea09049a85e5cc72c2894f3f7a371581/crates/primitives/src/transaction/mod.rs) +[File: crates/ethereum/primitives/src/transaction.rs](../../crates/ethereum/primitives/src/transaction.rs) ```rust, ignore #[reth_codec] #[derive(Debug, Clone, PartialEq, Eq, Hash, AsRef, Deref, Default, Serialize, Deserialize)] @@ -146,7 +167,7 @@ The lowest level stream to communicate with other peers is the P2P stream. It ta Decompression/Compression of bytes is done with snappy algorithm ([EIP 706](https://eips.ethereum.org/EIPS/eip-706)) using the external `snap` crate. -[File: crates/net/eth-wire/src/p2pstream.rs](https://github.com/paradigmxyz/reth/blob/1563506aea09049a85e5cc72c2894f3f7a371581/crates/net/eth-wire/src/p2pstream.rs) +[File: crates/net/eth-wire/src/p2pstream.rs](../../crates/net/eth-wire/src/p2pstream.rs) ```rust,ignore #[pin_project] pub struct P2PStream { @@ -155,23 +176,29 @@ pub struct P2PStream { encoder: snap::raw::Encoder, decoder: snap::raw::Decoder, pinger: Pinger, - shared_capability: SharedCapability, + /// Negotiated shared capabilities + shared_capabilities: SharedCapabilities, + /// Outgoing messages buffered for sending to the underlying stream. outgoing_messages: VecDeque, + /// Maximum number of messages that can be buffered before yielding backpressure. + outgoing_message_buffer_capacity: usize, + /// Whether this stream is currently in the process of gracefully disconnecting. disconnecting: bool, } ``` ### Pinger -To manage pinging, an instance of the `Pinger` struct is used. This is a state machine that keeps track of how many pings -we have sent/received and the timeouts associated with them. +To manage pinging, an instance of the `Pinger` struct is used. This is a state machine that keeps track of pings +we have sent/received and the timeout associated with them. -[File: crates/net/eth-wire/src/pinger.rs](https://github.com/paradigmxyz/reth/blob/1563506aea09049a85e5cc72c2894f3f7a371581/crates/net/eth-wire/src/pinger.rs) +[File: crates/net/eth-wire/src/pinger.rs](../../crates/net/eth-wire/src/pinger.rs) ```rust,ignore #[derive(Debug)] pub(crate) struct Pinger { /// The timer used for the next ping. ping_interval: Interval, - /// The timer used for the next ping. + /// The timer used to detect a ping timeout. timeout_timer: Pin>, + /// The timeout duration for each ping. timeout: Duration, state: PingState, } @@ -205,7 +232,7 @@ pub(crate) fn poll_ping( } } PingState::WaitingForPong => { - if self.timeout_timer.is_elapsed() { + if self.timeout_timer.as_mut().poll(cx).is_ready() { self.state = PingState::TimedOut; return Poll::Ready(Ok(PingerEvent::Timeout)) } @@ -223,7 +250,7 @@ To send and receive data, the P2PStream itself is a future that implements the ` For the `Stream` trait, the `inner` stream is polled, decompressed and returned. Most of the code is just error handling and is omitted here for clarity. -[File: crates/net/eth-wire/src/p2pstream.rs](https://github.com/paradigmxyz/reth/blob/1563506aea09049a85e5cc72c2894f3f7a371581/crates/net/eth-wire/src/p2pstream.rs) +[File: crates/net/eth-wire/src/p2pstream.rs](../../crates/net/eth-wire/src/p2pstream.rs) ```rust,ignore impl Stream for P2PStream { @@ -240,7 +267,8 @@ impl Stream for P2PStream { let mut decompress_buf = BytesMut::zeroed(decompressed_len + 1); this.decoder.decompress(&bytes[1..], &mut decompress_buf[1..])?; // ... Omitted Error handling - decompress_buf[0] = bytes[0] - this.shared_capability.offset(); + // Normalize IDs: reserved p2p range is 0x00..=0x0f; subprotocols start at 0x10 + decompress_buf[0] = bytes[0] - MAX_RESERVED_MESSAGE_ID - 1; return Poll::Ready(Some(Ok(decompress_buf))) } } @@ -250,7 +278,7 @@ impl Stream for P2PStream { Similarly, for the `Sink` trait, we do the reverse, compressing and sending data out to the `inner` stream. The important functions in this trait are shown below. -[File: crates/net/eth-wire/src/p2pstream.rs](https://github.com/paradigmxyz/reth/blob/1563506aea09049a85e5cc72c2894f3f7a371581/crates/net/eth-wire/src/p2pstream.rs) +[File: crates/net/eth-wire/src/p2pstream.rs](../../crates/net/eth-wire/src/p2pstream.rs) ```rust, ignore impl Sink for P2PStream { fn start_send(self: Pin<&mut Self>, item: Bytes) -> Result<(), Self::Error> { @@ -258,7 +286,8 @@ impl Sink for P2PStream { let mut compressed = BytesMut::zeroed(1 + snap::raw::max_compress_len(item.len() - 1)); let compressed_size = this.encoder.compress(&item[1..], &mut compressed[1..])?; compressed.truncate(compressed_size + 1); - compressed[0] = item[0] + this.shared_capability.offset(); + // Mask subprotocol IDs into global space above reserved p2p IDs + compressed[0] = item[0] + MAX_RESERVED_MESSAGE_ID + 1; this.outgoing_messages.push_back(compressed.freeze()); Ok(()) } @@ -285,9 +314,9 @@ impl Sink for P2PStream { ## EthStream -The EthStream is very simple, it does not keep track of any state, it simply wraps the P2Pstream. +The EthStream wraps a stream and handles eth message (RLP) encoding/decoding with respect to the negotiated `EthVersion`. -[File: crates/net/eth-wire/src/ethstream.rs](https://github.com/paradigmxyz/reth/blob/1563506aea09049a85e5cc72c2894f3f7a371581/crates/net/eth-wire/src/ethstream.rs) +[File: crates/net/eth-wire/src/ethstream.rs](../../crates/net/eth-wire/src/ethstream.rs) ```rust,ignore #[pin_project] pub struct EthStream { @@ -295,10 +324,10 @@ pub struct EthStream { inner: S, } ``` -EthStream's only job is to perform the RLP decoding/encoding, using the `ProtocolMessage::decode()` and `ProtocolMessage::encode()` -functions we looked at earlier. +EthStream performs RLP decoding/encoding using `ProtocolMessage::decode_message(version, &mut bytes)` +and `ProtocolMessage::encode()`, and enforces protocol rules (e.g., prohibiting `Status` after handshake). -[File: crates/net/eth-wire/src/ethstream.rs](https://github.com/paradigmxyz/reth/blob/1563506aea09049a85e5cc72c2894f3f7a371581/crates/net/eth-wire/src/ethstream.rs) +[File: crates/net/eth-wire/src/ethstream.rs](../../crates/net/eth-wire/src/ethstream.rs) ```rust,ignore impl Stream for EthStream { // ... @@ -306,7 +335,7 @@ impl Stream for EthStream { let this = self.project(); let bytes = ready!(this.inner.poll_next(cx)).unwrap(); // ... - let msg = match ProtocolMessage::decode(&mut bytes.as_ref()) { + let msg = match ProtocolMessage::decode_message(self.version(), &mut bytes.as_ref()) { Ok(m) => m, Err(err) => { return Poll::Ready(Some(Err(err.into()))) @@ -319,10 +348,12 @@ impl Stream for EthStream { impl Sink for EthStream { // ... fn start_send(self: Pin<&mut Self>, item: EthMessage) -> Result<(), Self::Error> { - // ... + if matches!(item, EthMessage::Status(_)) { + let _ = self.project().inner.disconnect(DisconnectReason::ProtocolBreach); + return Err(EthStreamError::EthHandshakeError(EthHandshakeError::StatusNotInHandshake)) + } let mut bytes = BytesMut::new(); ProtocolMessage::from(item).encode(&mut bytes); - let bytes = bytes.freeze(); self.project().inner.start_send(bytes)?; Ok(()) @@ -339,9 +370,9 @@ For a session to be established, peers in the Ethereum network must first exchan To perform these, reth has special `Unauthed` versions of streams described above. -The `UnauthedP2Pstream` does the `Hello` handshake and returns a `P2PStream`. +The `UnauthedP2PStream` does the `Hello` handshake and returns a `P2PStream`. -[File: crates/net/eth-wire/src/p2pstream.rs](https://github.com/paradigmxyz/reth/blob/1563506aea09049a85e5cc72c2894f3f7a371581/crates/net/eth-wire/src/p2pstream.rs) +[File: crates/net/eth-wire/src/p2pstream.rs](../../crates/net/eth-wire/src/p2pstream.rs) ```rust, ignore #[pin_project] pub struct UnauthedP2PStream { @@ -351,8 +382,8 @@ pub struct UnauthedP2PStream { impl UnauthedP2PStream { // ... - pub async fn handshake(mut self, hello: HelloMessage) -> Result<(P2PStream, HelloMessage), Error> { - self.inner.send(alloy_rlp::encode(P2PMessage::Hello(hello.clone())).into()).await?; + pub async fn handshake(mut self, hello: HelloMessageWithProtocols) -> Result<(P2PStream, HelloMessage), Error> { + self.inner.send(alloy_rlp::encode(P2PMessage::Hello(hello.message())).into()).await?; let first_message_bytes = tokio::time::timeout(HANDSHAKE_TIMEOUT, self.inner.next()).await; let their_hello = match P2PMessage::decode(&mut &first_message_bytes[..]) { @@ -360,11 +391,25 @@ impl UnauthedP2PStream { // ... } }?; - let stream = P2PStream::new(self.inner, capability); + let stream = P2PStream::new(self.inner, shared_capabilities); Ok((stream, their_hello)) } } ``` -Similarly, UnauthedEthStream does the `Status` handshake and returns an `EthStream`. The code is [here](https://github.com/paradigmxyz/reth/blob/1563506aea09049a85e5cc72c2894f3f7a371581/crates/net/eth-wire/src/ethstream.rs) +Similarly, `UnauthedEthStream` does the `Status` handshake and returns an `EthStream`. It accepts a `UnifiedStatus` +and a `ForkFilter`, and provides a timeout wrapper. The code is [here](../../crates/net/eth-wire/src/ethstream.rs) + +### Multiplexing and satellites + +`eth-wire` also provides `RlpxProtocolMultiplexer`/`RlpxSatelliteStream` to run the primary `eth` protocol alongside +additional "satellite" protocols (e.g. `snap`) using negotiated `SharedCapabilities`. + +## Message variants and versions + +- `NewPooledTransactionHashes` differs between ETH66 (`NewPooledTransactionHashes66`) and ETH68 (`NewPooledTransactionHashes68`). +- Starting with ETH67, `GetNodeData` and `NodeData` are removed (decoding them for >=67 yields an error). +- Starting with ETH69: + - `BlockRangeUpdate (0x11)` announces the historical block range served. + - Receipts omit bloom: encoded as `Receipts69` instead of `Receipts`.