chore: touchups

This commit is contained in:
Matthias Seitz
2025-12-16 12:17:16 +01:00
parent 3adac16571
commit 1f93ee97bb
8 changed files with 92 additions and 210 deletions

View File

@@ -12,7 +12,7 @@
extern crate alloc;
mod status;
pub use status::{Status, StatusBuilder, StatusEth69, StatusEth70, StatusMessage, UnifiedStatus};
pub use status::{Status, StatusBuilder, StatusEth69, StatusMessage, UnifiedStatus};
pub mod version;
pub use version::{EthVersion, ProtocolVersion};

View File

@@ -66,10 +66,10 @@ impl<N: NetworkPrimitives> ProtocolMessage<N> {
// For EIP-7642 (https://github.com/ethereum/EIPs/blob/master/EIPS/eip-7642.md):
// pre-merge (legacy) status messages include total difficulty, whereas eth/69 omits it.
let message = match message_type {
EthMessageID::Status => EthMessage::Status(if version >= EthVersion::Eth69 {
StatusMessage::Eth69(StatusEth69::decode(buf)?)
} else {
EthMessageID::Status => EthMessage::Status(if version < EthVersion::Eth69 {
StatusMessage::Legacy(Status::decode(buf)?)
} else {
StatusMessage::Eth69(StatusEth69::decode(buf)?)
}),
EthMessageID::NewBlockHashes => {
EthMessage::NewBlockHashes(NewBlockHashes::decode(buf)?)
@@ -113,22 +113,27 @@ impl<N: NetworkPrimitives> ProtocolMessage<N> {
}
EthMessageID::GetReceipts => {
if version >= EthVersion::Eth70 {
EthMessage::GetReceipts70(crate::receipts::GetReceipts70::decode(buf)?)
EthMessage::GetReceipts70(RequestPair::decode(buf)?)
} else {
EthMessage::GetReceipts(RequestPair::decode(buf)?)
}
}
EthMessageID::Receipts => {
if version < EthVersion::Eth69 {
EthMessage::Receipts(RequestPair::decode(buf)?)
} else if version < EthVersion::Eth70 {
// with eth69, receipts no longer include the bloom
EthMessage::Receipts69(RequestPair::decode(buf)?)
} else {
// eth/70 continues to omit bloom filters and adds the
// `lastBlockIncomplete` flag, encoded as
// `[request-id, lastBlockIncomplete, [[receipt₁, receipt₂], ...]]`.
EthMessage::Receipts70(crate::receipts::Receipts70::<N::Receipt>::decode(buf)?)
match version {
v if v >= EthVersion::Eth70 => {
// eth/70 continues to omit bloom filters and adds the
// `lastBlockIncomplete` flag, encoded as
// `[request-id, lastBlockIncomplete, [[receipt₁, receipt₂], ...]]`.
EthMessage::Receipts70(RequestPair::decode(buf)?)
}
EthVersion::Eth69 => {
// with eth69, receipts no longer include the bloom
EthMessage::Receipts69(RequestPair::decode(buf)?)
}
_ => {
// before eth69 we need to decode the bloom as well
EthMessage::Receipts(RequestPair::decode(buf)?)
}
}
}
EthMessageID::BlockRangeUpdate => {
@@ -217,7 +222,7 @@ impl<N: NetworkPrimitives> From<EthBroadcastMessage<N>> for ProtocolBroadcastMes
/// The `eth/69` announces the historical block range served by the node. Removes total difficulty
/// information. And removes the Bloom field from receipts transferred over the protocol.
///
/// The `eth/70` (EIP-7975) keeps the eth/69 status format and introduces partial receipts
/// The `eth/70` (EIP-7975) keeps the eth/69 status format and introduces partial receipts.
/// requests/responses.
#[derive(Clone, Debug, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
@@ -278,7 +283,7 @@ pub enum EthMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
/// Note: Unlike earlier protocol versions, the eth/70 encoding for
/// `GetReceipts` in EIP-7975 inlines the request id. The type still wraps
/// a [`RequestPair`], but with a custom inline encoding.
GetReceipts70(GetReceipts70),
GetReceipts70(RequestPair<GetReceipts70>),
/// Represents a Receipts request-response pair.
#[cfg_attr(
feature = "serde",
@@ -300,7 +305,7 @@ pub enum EthMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
/// Note: The eth/70 encoding for `Receipts` in EIP-7975 inlines the
/// request id. The type still wraps a [`RequestPair`], but with a custom
/// inline encoding.
Receipts70(Receipts70<N::Receipt>),
Receipts70(RequestPair<Receipts70<N::Receipt>>),
/// Represents a `BlockRangeUpdate` message broadcast to the network.
#[cfg_attr(
feature = "serde",
@@ -363,6 +368,34 @@ impl<N: NetworkPrimitives> EthMessage<N> {
Self::NodeData(_)
)
}
/// Converts the message types where applicable.
///
/// This handles up/downcasting where appropriate, for example for different receipt request
/// types.
pub fn map_versioned(mut self, version: EthVersion) -> Self {
// For eth/70 peers we send `GetReceipts` using the new eth/70
// encoding with `firstBlockReceiptIndex = 0`, while keeping the
// user-facing `PeerRequest` API unchanged.
if version >= EthVersion::Eth70 {
return match self {
EthMessage::GetReceipts(pair) => {
let RequestPair { request_id, message } = pair;
let req = RequestPair {
request_id,
message: GetReceipts70 {
first_block_receipt_index: 0,
block_hashes: message.0,
},
};
EthMessage::GetReceipts70(req)
}
other => other,
}
}
self
}
}
impl<N: NetworkPrimitives> Encodable for EthMessage<N> {

View File

@@ -21,18 +21,20 @@ pub struct GetReceipts(
///
/// When used with eth/70, the request id is carried by the surrounding
/// [`crate::message::RequestPair`], and the on-wire shape is the flattened list
/// `[request-id, firstBlockReceiptIndex, [blockhash₁, ...]]`.
/// `firstBlockReceiptIndex, [blockhash₁, ...]`.
///
/// See also [eip-7975](https://eips.ethereum.org/EIPS/eip-7975)
#[derive(Clone, Debug, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
pub struct GetReceipts70Payload {
pub struct GetReceipts70 {
/// Index into the receipts of the first requested block hash.
pub first_block_receipt_index: u64,
/// The block hashes to request receipts for.
pub block_hashes: Vec<B256>,
}
impl alloy_rlp::Encodable for GetReceipts70Payload {
impl alloy_rlp::Encodable for GetReceipts70 {
fn encode(&self, out: &mut dyn alloy_rlp::BufMut) {
self.first_block_receipt_index.encode(out);
self.block_hashes.encode(out);
@@ -43,7 +45,7 @@ impl alloy_rlp::Encodable for GetReceipts70Payload {
}
}
impl alloy_rlp::Decodable for GetReceipts70Payload {
impl alloy_rlp::Decodable for GetReceipts70 {
fn decode(buf: &mut &[u8]) -> alloy_rlp::Result<Self> {
let first_block_receipt_index = u64::decode(buf)?;
let block_hashes = Vec::<B256>::decode(buf)?;
@@ -51,29 +53,6 @@ impl alloy_rlp::Decodable for GetReceipts70Payload {
}
}
/// Helper type for the full eth/70 request carrying the request id alongside
/// the payload.
#[derive(Clone, Debug, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
pub struct GetReceipts70(pub crate::message::RequestPair<GetReceipts70Payload>);
impl alloy_rlp::Encodable for GetReceipts70 {
fn encode(&self, out: &mut dyn alloy_rlp::BufMut) {
self.0.encode(out);
}
fn length(&self) -> usize {
self.0.length()
}
}
impl alloy_rlp::Decodable for GetReceipts70 {
fn decode(buf: &mut &[u8]) -> alloy_rlp::Result<Self> {
crate::message::RequestPair::<GetReceipts70Payload>::decode(buf).map(Self)
}
}
/// The response to [`GetReceipts`], containing receipt lists that correspond to each block
/// requested.
#[derive(Clone, Debug, PartialEq, Eq, Default)]
@@ -145,14 +124,14 @@ impl<T: TxReceipt> From<Receipts69<T>> for Receipts<T> {
#[derive(Clone, Debug, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
pub struct Receipts70Payload<T = Receipt> {
pub struct Receipts70<T = Receipt> {
/// Whether the receipts list for the last block is incomplete.
pub last_block_incomplete: bool,
/// Receipts grouped by block.
pub receipts: Vec<Vec<T>>,
}
impl<T> alloy_rlp::Encodable for Receipts70Payload<T>
impl<T> alloy_rlp::Encodable for Receipts70<T>
where
T: alloy_rlp::Encodable,
{
@@ -166,7 +145,7 @@ where
}
}
impl<T> alloy_rlp::Decodable for Receipts70Payload<T>
impl<T> alloy_rlp::Decodable for Receipts70<T>
where
T: alloy_rlp::Decodable,
{
@@ -177,36 +156,7 @@ where
}
}
/// Helper type for the full eth/70 response carrying the request id alongside
/// the payload.
#[derive(Clone, Debug, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
pub struct Receipts70<T = Receipt>(pub crate::message::RequestPair<Receipts70Payload<T>>);
impl<T> alloy_rlp::Encodable for Receipts70<T>
where
T: alloy_rlp::Encodable,
{
fn encode(&self, out: &mut dyn alloy_rlp::BufMut) {
self.0.encode(out);
}
fn length(&self) -> usize {
self.0.length()
}
}
impl<T> alloy_rlp::Decodable for Receipts70<T>
where
T: alloy_rlp::Decodable,
{
fn decode(buf: &mut &[u8]) -> alloy_rlp::Result<Self> {
crate::message::RequestPair::<Receipts70Payload<T>>::decode(buf).map(Self)
}
}
impl<T: TxReceipt> Receipts70Payload<T> {
impl<T: TxReceipt> Receipts70<T> {
/// Encodes all receipts with the bloom filter.
///
/// Just like eth/69, eth/70 does not transmit bloom filters over the wire.
@@ -225,7 +175,7 @@ impl<T: TxReceipt> Receipts70Payload<T> {
impl<T: TxReceipt> From<Receipts70<T>> for Receipts<T> {
fn from(receipts: Receipts70<T>) -> Self {
receipts.0.message.into_with_bloom()
receipts.into_with_bloom()
}
}
@@ -382,16 +332,16 @@ mod tests {
#[test]
fn encode_get_receipts70_inline_shape() {
let req = GetReceipts70(RequestPair {
let req = RequestPair {
request_id: 1111,
message: GetReceipts70Payload {
message: GetReceipts70 {
first_block_receipt_index: 0,
block_hashes: vec![
hex!("00000000000000000000000000000000000000000000000000000000deadc0de").into(),
hex!("00000000000000000000000000000000000000000000000000000000feedbeef").into(),
],
},
});
};
let mut out = vec![];
req.encode(&mut out);
@@ -411,19 +361,17 @@ mod tests {
assert_eq!(payload_start - buf.len(), header.payload_length);
let mut buf = out.as_slice();
let decoded = GetReceipts70::decode(&mut buf).unwrap();
let decoded = RequestPair::<GetReceipts70>::decode(&mut buf).unwrap();
assert!(buf.is_empty(), "buffer not fully consumed on decode");
assert_eq!(decoded, req);
}
#[test]
fn encode_receipts70_inline_shape() {
let payload: Receipts70Payload<Receipt> = Receipts70Payload {
last_block_incomplete: true,
receipts: vec![vec![Receipt::default()]],
};
let payload: Receipts70<Receipt> =
Receipts70 { last_block_incomplete: true, receipts: vec![vec![Receipt::default()]] };
let resp = Receipts70(RequestPair { request_id: 7, message: payload });
let resp = RequestPair { request_id: 7, message: payload };
let mut out = vec![];
resp.encode(&mut out);
@@ -443,7 +391,7 @@ mod tests {
assert_eq!(receipts[0].len(), 1);
let mut buf = out.as_slice();
let decoded = Receipts70::decode(&mut buf).unwrap();
let decoded = RequestPair::<Receipts70>::decode(&mut buf).unwrap();
assert!(buf.is_empty(), "buffer not fully consumed on decode");
assert_eq!(decoded, resp);
}

View File

@@ -109,11 +109,6 @@ impl UnifiedStatus {
}
}
/// Consume this `UnifiedStatus` and produce the [`StatusEth70`] message used by `eth/70`.
pub fn into_eth70(self) -> StatusEth70 {
self.into_eth69()
}
/// Convert this `UnifiedStatus` into the appropriate `StatusMessage` variant based on version.
pub fn into_message(self) -> StatusMessage {
if self.version >= EthVersion::Eth69 {
@@ -383,9 +378,6 @@ impl Debug for StatusEth69 {
}
}
/// Share eth/69 status with eth/70
pub type StatusEth70 = StatusEth69;
/// `StatusMessage` can store either the Legacy version (with TD), or the eth/69+/eth/70 version
/// (omits TD, includes block range).
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]

View File

@@ -3,8 +3,8 @@
use reth_eth_wire_types::{
message::RequestPair, BlockBodies, BlockHeaders, Capabilities, DisconnectReason, EthMessage,
EthNetworkPrimitives, EthVersion, GetBlockBodies, GetBlockHeaders, GetNodeData,
GetPooledTransactions, GetReceipts, GetReceipts70, GetReceipts70Payload, NetworkPrimitives,
NodeData, PooledTransactions, Receipts, Receipts69, Receipts70Payload, UnifiedStatus,
GetPooledTransactions, GetReceipts, GetReceipts70, NetworkPrimitives, NodeData,
PooledTransactions, Receipts, Receipts69, Receipts70, UnifiedStatus,
};
use reth_ethereum_forks::ForkId;
use reth_network_p2p::error::{RequestError, RequestResult};
@@ -243,9 +243,9 @@ pub enum PeerRequest<N: NetworkPrimitives = EthNetworkPrimitives> {
/// The response should be sent through the channel.
GetReceipts70 {
/// The request for receipts.
request: GetReceipts70Payload,
request: GetReceipts70,
/// The channel to send the response for receipts.
response: oneshot::Sender<RequestResult<Receipts70Payload<N::Receipt>>>,
response: oneshot::Sender<RequestResult<Receipts70<N::Receipt>>>,
},
}
@@ -292,10 +292,7 @@ impl<N: NetworkPrimitives> PeerRequest<N> {
EthMessage::GetReceipts(RequestPair { request_id, message: request.clone() })
}
Self::GetReceipts70 { request, .. } => {
EthMessage::GetReceipts70(GetReceipts70(RequestPair {
request_id,
message: request.clone(),
}))
EthMessage::GetReceipts70(RequestPair { request_id, message: request.clone() })
}
}
}

View File

@@ -10,8 +10,8 @@ use alloy_rlp::Encodable;
use futures::StreamExt;
use reth_eth_wire::{
BlockBodies, BlockHeaders, EthNetworkPrimitives, GetBlockBodies, GetBlockHeaders, GetNodeData,
GetReceipts, GetReceipts70Payload, HeadersDirection, NetworkPrimitives, NodeData, Receipts,
Receipts69, Receipts70Payload,
GetReceipts, GetReceipts70, HeadersDirection, NetworkPrimitives, NodeData, Receipts,
Receipts69, Receipts70,
};
use reth_network_api::test_utils::PeersHandle;
use reth_network_p2p::error::RequestResult;
@@ -221,12 +221,12 @@ where
fn on_receipts70_request(
&self,
_peer_id: PeerId,
request: GetReceipts70Payload,
response: oneshot::Sender<RequestResult<Receipts70Payload<C::Receipt>>>,
request: GetReceipts70,
response: oneshot::Sender<RequestResult<Receipts70<C::Receipt>>>,
) {
self.metrics.eth_receipts_requests_received_total.increment(1);
let GetReceipts70Payload { first_block_receipt_index, block_hashes } = request;
let GetReceipts70 { first_block_receipt_index, block_hashes } = request;
let mut receipts = Vec::new();
let mut total_bytes = 0usize;
@@ -275,7 +275,7 @@ where
break;
}
let _ = response.send(Ok(Receipts70Payload { last_block_incomplete, receipts }));
let _ = response.send(Ok(Receipts70 { last_block_incomplete, receipts }));
}
#[inline]
@@ -430,8 +430,8 @@ pub enum IncomingEthRequest<N: NetworkPrimitives = EthNetworkPrimitives> {
/// The ID of the peer to request receipts from.
peer_id: PeerId,
/// The specific receipts requested including the `firstBlockReceiptIndex`.
request: GetReceipts70Payload,
request: GetReceipts70,
/// The channel sender for the response containing Receipts70.
response: oneshot::Sender<RequestResult<Receipts70Payload<N::Receipt>>>,
response: oneshot::Sender<RequestResult<Receipts70<N::Receipt>>>,
},
}

View File

@@ -3,7 +3,7 @@
//! An `RLPx` stream is multiplexed via the prepended message-id of a framed message.
//! Capabilities are exchanged via the `RLPx` `Hello` message as pairs of `(id, version)`, <https://github.com/ethereum/devp2p/blob/master/rlpx.md#capability-messaging>
use crate::types::Receipts69;
use crate::types::{Receipts69, Receipts70};
use alloy_consensus::{BlockHeader, ReceiptWithBloom};
use alloy_primitives::{Bytes, B256};
use futures::FutureExt;
@@ -11,7 +11,7 @@ use reth_eth_wire::{
message::RequestPair, BlockBodies, BlockHeaders, BlockRangeUpdate, EthMessage,
EthNetworkPrimitives, GetBlockBodies, GetBlockHeaders, NetworkPrimitives, NewBlock,
NewBlockHashes, NewBlockPayload, NewPooledTransactionHashes, NodeData, PooledTransactions,
Receipts, Receipts70, Receipts70Payload, SharedTransactions, Transactions,
Receipts, SharedTransactions, Transactions,
};
use reth_eth_wire_types::RawCapabilityMessage;
use reth_network_api::PeerRequest;
@@ -119,7 +119,7 @@ pub enum PeerResponse<N: NetworkPrimitives = EthNetworkPrimitives> {
/// Represents a response to a request for receipts using eth/70.
Receipts70 {
/// The receiver channel for the response to a receipts request.
response: oneshot::Receiver<RequestResult<Receipts70Payload<N::Receipt>>>,
response: oneshot::Receiver<RequestResult<Receipts70<N::Receipt>>>,
},
}
@@ -181,7 +181,7 @@ pub enum PeerResponseResult<N: NetworkPrimitives = EthNetworkPrimitives> {
/// Represents a result containing receipts or an error for eth/69.
Receipts69(RequestResult<Vec<Vec<N::Receipt>>>),
/// Represents a result containing receipts or an error for eth/70.
Receipts70(RequestResult<Receipts70Payload<N::Receipt>>),
Receipts70(RequestResult<Receipts70<N::Receipt>>),
}
// === impl PeerResponseResult ===
@@ -222,7 +222,7 @@ impl<N: NetworkPrimitives> PeerResponseResult<N> {
Self::Receipts70(resp) => match resp {
Ok(res) => {
let request = RequestPair { request_id: id, message: res };
Ok(EthMessage::Receipts70(Receipts70(request)))
Ok(EthMessage::Receipts70(request))
}
Err(err) => Err(err),
},

View File

@@ -270,28 +270,8 @@ impl<N: NetworkPrimitives> ActiveSession<N> {
on_request!(req, Receipts, GetReceipts)
}
}
EthMessage::GetReceipts70(req70) => {
let RequestPair {
request_id,
message:
reth_eth_wire::GetReceipts70Payload { first_block_receipt_index, block_hashes },
} = req70.0;
let (tx, response) = oneshot::channel();
let received = ReceivedRequest {
request_id,
rx: PeerResponse::Receipts70 { response },
received: Instant::now(),
};
self.received_requests_from_remote.push(received);
let request =
reth_eth_wire::GetReceipts70Payload { first_block_receipt_index, block_hashes };
self.try_emit_request(PeerMessage::EthRequest(PeerRequest::GetReceipts70 {
request,
response: tx,
}))
.into()
EthMessage::GetReceipts70(req) => {
on_request!(req, Receipts70, GetReceipts70)
}
EthMessage::Receipts(resp) => {
on_response!(resp, GetReceipts)
@@ -300,53 +280,7 @@ impl<N: NetworkPrimitives> ActiveSession<N> {
on_response!(resp, GetReceipts69)
}
EthMessage::Receipts70(resp) => {
// Handle eth/70 receipts responses. Support `GetReceipts`,
// `GetReceipts69`, and `GetReceipts70` by converting the payload as needed.
let RequestPair {
request_id,
message: reth_eth_wire::Receipts70Payload { last_block_incomplete, receipts },
} = resp.0;
if let Some(req) = self.inflight_requests.remove(&request_id) {
match req.request {
RequestState::Waiting(PeerRequest::GetReceipts69 { response, .. }) => {
trace!(peer_id=?self.remote_peer_id, ?request_id, "received eth/70 Receipts for GetReceipts69");
let message = reth_eth_wire::Receipts69(receipts);
let _ = response.send(Ok(message));
self.update_request_timeout(req.timestamp, Instant::now());
}
RequestState::Waiting(PeerRequest::GetReceipts70 { response, .. }) => {
trace!(peer_id=?self.remote_peer_id, ?request_id, "received eth/70 Receipts for GetReceipts70");
let message = reth_eth_wire::Receipts70Payload {
last_block_incomplete,
receipts,
};
let _ = response.send(Ok(message));
self.update_request_timeout(req.timestamp, Instant::now());
}
RequestState::Waiting(PeerRequest::GetReceipts { response, .. }) => {
trace!(peer_id=?self.remote_peer_id, ?request_id, "received eth/70 Receipts for GetReceipts");
let receipts69 = reth_eth_wire::Receipts69(receipts);
let message: reth_eth_wire::Receipts<N::Receipt> =
receipts69.into_with_bloom();
let _ = response.send(Ok(message));
self.update_request_timeout(req.timestamp, Instant::now());
}
RequestState::Waiting(request) => {
request.send_bad_response();
}
RequestState::TimedOut => {
// request was already timed out internally
self.update_request_timeout(req.timestamp, Instant::now());
}
}
} else {
trace!(peer_id=?self.remote_peer_id, ?request_id, "received response to unknown request");
// we received a response to a request we never sent
self.on_bad_message();
}
OnIncomingMessageOutcome::Ok
on_response!(resp, GetReceipts70)
}
EthMessage::BlockRangeUpdate(msg) => {
// Validate that earliest <= latest according to the spec
@@ -383,30 +317,8 @@ impl<N: NetworkPrimitives> ActiveSession<N> {
/// Handle an internal peer request that will be sent to the remote.
fn on_internal_peer_request(&mut self, request: PeerRequest<N>, deadline: Instant) {
let request_id = self.next_id();
trace!(?request, peer_id=?self.remote_peer_id, ?request_id, "sending request to peer");
let mut msg = request.create_request_message(request_id);
// For eth/70 peers we send `GetReceipts` using the new eth/70
// encoding with `firstBlockReceiptIndex = 0`, while keeping the
// user-facing `PeerRequest` API unchanged.
if self.conn.version() >= EthVersion::Eth70 {
msg = match msg {
EthMessage::GetReceipts(pair) => {
let RequestPair { request_id, message } = pair;
let reth_eth_wire::GetReceipts(block_hashes) = message;
let get70 = reth_eth_wire::GetReceipts70(RequestPair {
request_id,
message: reth_eth_wire::GetReceipts70Payload {
first_block_receipt_index: 0,
block_hashes,
},
});
EthMessage::GetReceipts70(get70)
}
other => other,
};
}
let mut msg = request.create_request_message(request_id).map_versioned(self.conn.version());
self.queued_outgoing.push_back(msg.into());
let req = InflightRequest {