feat: implement variants for BAL devp2p variants (#22024)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
Karl Yu
2026-02-14 16:22:26 +08:00
committed by GitHub
parent df22d38224
commit 4b0fa8a330
15 changed files with 373 additions and 28 deletions

View File

@@ -0,0 +1,27 @@
//! Implements the `GetBlockAccessLists` and `BlockAccessLists` message types.
use alloc::vec::Vec;
use alloy_primitives::{Bytes, B256};
use alloy_rlp::{RlpDecodableWrapper, RlpEncodableWrapper};
use reth_codecs_derive::add_arbitrary_tests;
/// A request for block access lists from the given block hashes.
#[derive(Clone, Debug, PartialEq, Eq, RlpEncodableWrapper, RlpDecodableWrapper, Default)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
#[add_arbitrary_tests(rlp)]
pub struct GetBlockAccessLists(
/// The block hashes to request block access lists for.
pub Vec<B256>,
);
/// Response for [`GetBlockAccessLists`] containing one BAL per requested block hash.
#[derive(Clone, Debug, PartialEq, Eq, RlpEncodableWrapper, RlpDecodableWrapper, Default)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
#[add_arbitrary_tests(rlp)]
pub struct BlockAccessLists(
/// The requested block access lists as opaque bytes. Unavailable entries are represented by
/// empty byte slices.
pub Vec<Bytes>,
);

View File

@@ -169,7 +169,10 @@ impl NewPooledTransactionHashes {
matches!(version, EthVersion::Eth67 | EthVersion::Eth66)
}
Self::Eth68(_) => {
matches!(version, EthVersion::Eth68 | EthVersion::Eth69 | EthVersion::Eth70)
matches!(
version,
EthVersion::Eth68 | EthVersion::Eth69 | EthVersion::Eth70 | EthVersion::Eth71
)
}
}
}

View File

@@ -110,6 +110,11 @@ impl Capability {
Self::eth(EthVersion::Eth70)
}
/// Returns the [`EthVersion::Eth71`] capability.
pub const fn eth_71() -> Self {
Self::eth(EthVersion::Eth71)
}
/// Whether this is eth v66 protocol.
#[inline]
pub fn is_eth_v66(&self) -> bool {
@@ -140,6 +145,12 @@ impl Capability {
self.name == "eth" && self.version == 70
}
/// Whether this is eth v71.
#[inline]
pub fn is_eth_v71(&self) -> bool {
self.name == "eth" && self.version == 71
}
/// Whether this is any eth version.
#[inline]
pub fn is_eth(&self) -> bool {
@@ -147,7 +158,8 @@ impl Capability {
self.is_eth_v67() ||
self.is_eth_v68() ||
self.is_eth_v69() ||
self.is_eth_v70()
self.is_eth_v70() ||
self.is_eth_v71()
}
}
@@ -167,7 +179,7 @@ impl From<EthVersion> for Capability {
#[cfg(any(test, feature = "arbitrary"))]
impl<'a> arbitrary::Arbitrary<'a> for Capability {
fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
let version = u.int_in_range(66..=70)?; // Valid eth protocol versions are 66-70
let version = u.int_in_range(66..=71)?; // Valid eth protocol versions are 66-71
// Only generate valid eth protocol name for now since it's the only supported protocol
Ok(Self::new_static("eth", version))
}
@@ -183,6 +195,7 @@ pub struct Capabilities {
eth_68: bool,
eth_69: bool,
eth_70: bool,
eth_71: bool,
}
impl Capabilities {
@@ -194,6 +207,7 @@ impl Capabilities {
eth_68: value.iter().any(Capability::is_eth_v68),
eth_69: value.iter().any(Capability::is_eth_v69),
eth_70: value.iter().any(Capability::is_eth_v70),
eth_71: value.iter().any(Capability::is_eth_v71),
inner: value,
}
}
@@ -212,7 +226,7 @@ impl Capabilities {
/// Whether the peer supports `eth` sub-protocol.
#[inline]
pub const fn supports_eth(&self) -> bool {
self.eth_70 || self.eth_69 || self.eth_68 || self.eth_67 || self.eth_66
self.eth_71 || self.eth_70 || self.eth_69 || self.eth_68 || self.eth_67 || self.eth_66
}
/// Whether this peer supports eth v66 protocol.
@@ -244,6 +258,12 @@ impl Capabilities {
pub const fn supports_eth_v70(&self) -> bool {
self.eth_70
}
/// Whether this peer supports eth v71 protocol.
#[inline]
pub const fn supports_eth_v71(&self) -> bool {
self.eth_71
}
}
impl From<Vec<Capability>> for Capabilities {
@@ -268,6 +288,7 @@ impl Decodable for Capabilities {
eth_68: inner.iter().any(Capability::is_eth_v68),
eth_69: inner.iter().any(Capability::is_eth_v69),
eth_70: inner.iter().any(Capability::is_eth_v70),
eth_71: inner.iter().any(Capability::is_eth_v71),
inner,
})
}

View File

@@ -38,6 +38,9 @@ pub use state::*;
pub mod receipts;
pub use receipts::*;
pub mod block_access_lists;
pub use block_access_lists::*;
pub mod disconnect_reason;
pub use disconnect_reason::*;

View File

@@ -1,4 +1,4 @@
//! Implements Ethereum wire protocol for versions 66 through 70.
//! Implements Ethereum wire protocol for versions 66 through 71.
//! Defines structs/enums for messages, request-response pairs, and broadcasts.
//! Handles compatibility with [`EthVersion`].
//!
@@ -7,10 +7,10 @@
//! Reference: [Ethereum Wire Protocol](https://github.com/ethereum/devp2p/blob/master/caps/eth.md).
use super::{
broadcast::NewBlockHashes, BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders,
GetNodeData, GetPooledTransactions, GetReceipts, GetReceipts70, NewPooledTransactionHashes66,
NewPooledTransactionHashes68, NodeData, PooledTransactions, Receipts, Status, StatusEth69,
Transactions,
broadcast::NewBlockHashes, BlockAccessLists, BlockBodies, BlockHeaders, GetBlockAccessLists,
GetBlockBodies, GetBlockHeaders, GetNodeData, GetPooledTransactions, GetReceipts,
GetReceipts70, NewPooledTransactionHashes66, NewPooledTransactionHashes68, NodeData,
PooledTransactions, Receipts, Status, StatusEth69, Transactions,
};
use crate::{
status::StatusMessage, BlockRangeUpdate, EthNetworkPrimitives, EthVersion, NetworkPrimitives,
@@ -168,6 +168,18 @@ impl<N: NetworkPrimitives> ProtocolMessage<N> {
}
EthMessage::BlockRangeUpdate(BlockRangeUpdate::decode(buf)?)
}
EthMessageID::GetBlockAccessLists => {
if version < EthVersion::Eth71 {
return Err(MessageError::Invalid(version, EthMessageID::GetBlockAccessLists))
}
EthMessage::GetBlockAccessLists(RequestPair::decode(buf)?)
}
EthMessageID::BlockAccessLists => {
if version < EthVersion::Eth71 {
return Err(MessageError::Invalid(version, EthMessageID::BlockAccessLists))
}
EthMessage::BlockAccessLists(RequestPair::decode(buf)?)
}
EthMessageID::Other(_) => {
let raw_payload = Bytes::copy_from_slice(buf);
buf.advance(raw_payload.len());
@@ -250,6 +262,8 @@ impl<N: NetworkPrimitives> From<EthBroadcastMessage<N>> for ProtocolBroadcastMes
///
/// The `eth/70` (EIP-7975) keeps the eth/69 status format and introduces partial receipts.
/// requests/responses.
///
/// The `eth/71` draft extends eth/70 with block access list request/response messages.
#[derive(Clone, Debug, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub enum EthMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
@@ -310,6 +324,8 @@ pub enum EthMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
/// `GetReceipts` in EIP-7975 inlines the request id. The type still wraps
/// a [`RequestPair`], but with a custom inline encoding.
GetReceipts70(RequestPair<GetReceipts70>),
/// Represents a `GetBlockAccessLists` request-response pair for eth/71.
GetBlockAccessLists(RequestPair<GetBlockAccessLists>),
/// Represents a Receipts request-response pair.
#[cfg_attr(
feature = "serde",
@@ -332,6 +348,8 @@ pub enum EthMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
/// request id. The type still wraps a [`RequestPair`], but with a custom
/// inline encoding.
Receipts70(RequestPair<Receipts70<N::Receipt>>),
/// Represents a `BlockAccessLists` request-response pair for eth/71.
BlockAccessLists(RequestPair<BlockAccessLists>),
/// Represents a `BlockRangeUpdate` message broadcast to the network.
#[cfg_attr(
feature = "serde",
@@ -364,6 +382,8 @@ impl<N: NetworkPrimitives> EthMessage<N> {
Self::GetReceipts(_) | Self::GetReceipts70(_) => EthMessageID::GetReceipts,
Self::Receipts(_) | Self::Receipts69(_) | Self::Receipts70(_) => EthMessageID::Receipts,
Self::BlockRangeUpdate(_) => EthMessageID::BlockRangeUpdate,
Self::GetBlockAccessLists(_) => EthMessageID::GetBlockAccessLists,
Self::BlockAccessLists(_) => EthMessageID::BlockAccessLists,
Self::Other(msg) => EthMessageID::Other(msg.id as u8),
}
}
@@ -376,6 +396,7 @@ impl<N: NetworkPrimitives> EthMessage<N> {
Self::GetBlockHeaders(_) |
Self::GetReceipts(_) |
Self::GetReceipts70(_) |
Self::GetBlockAccessLists(_) |
Self::GetPooledTransactions(_) |
Self::GetNodeData(_)
)
@@ -389,6 +410,7 @@ impl<N: NetworkPrimitives> EthMessage<N> {
Self::Receipts(_) |
Self::Receipts69(_) |
Self::Receipts70(_) |
Self::BlockAccessLists(_) |
Self::BlockHeaders(_) |
Self::BlockBodies(_) |
Self::NodeData(_)
@@ -443,9 +465,11 @@ impl<N: NetworkPrimitives> Encodable for EthMessage<N> {
Self::NodeData(data) => data.encode(out),
Self::GetReceipts(request) => request.encode(out),
Self::GetReceipts70(request) => request.encode(out),
Self::GetBlockAccessLists(request) => request.encode(out),
Self::Receipts(receipts) => receipts.encode(out),
Self::Receipts69(receipt69) => receipt69.encode(out),
Self::Receipts70(receipt70) => receipt70.encode(out),
Self::BlockAccessLists(block_access_lists) => block_access_lists.encode(out),
Self::BlockRangeUpdate(block_range_update) => block_range_update.encode(out),
Self::Other(unknown) => out.put_slice(&unknown.payload),
}
@@ -468,9 +492,11 @@ impl<N: NetworkPrimitives> Encodable for EthMessage<N> {
Self::NodeData(data) => data.length(),
Self::GetReceipts(request) => request.length(),
Self::GetReceipts70(request) => request.length(),
Self::GetBlockAccessLists(request) => request.length(),
Self::Receipts(receipts) => receipts.length(),
Self::Receipts69(receipt69) => receipt69.length(),
Self::Receipts70(receipt70) => receipt70.length(),
Self::BlockAccessLists(block_access_lists) => block_access_lists.length(),
Self::BlockRangeUpdate(block_range_update) => block_range_update.length(),
Self::Other(unknown) => unknown.length(),
}
@@ -559,6 +585,14 @@ pub enum EthMessageID {
///
/// Introduced in Eth69
BlockRangeUpdate = 0x11,
/// Requests block access lists.
///
/// Introduced in Eth71
GetBlockAccessLists = 0x12,
/// Represents block access lists.
///
/// Introduced in Eth71
BlockAccessLists = 0x13,
/// Represents unknown message types.
Other(u8),
}
@@ -583,13 +617,17 @@ impl EthMessageID {
Self::GetReceipts => 0x0f,
Self::Receipts => 0x10,
Self::BlockRangeUpdate => 0x11,
Self::GetBlockAccessLists => 0x12,
Self::BlockAccessLists => 0x13,
Self::Other(value) => *value, // Return the stored `u8`
}
}
/// Returns the max value for the given version.
pub const fn max(version: EthVersion) -> u8 {
if version as u8 >= EthVersion::Eth69 as u8 {
if version.is_eth71() {
Self::BlockAccessLists.to_u8()
} else if version.is_eth69_or_newer() {
Self::BlockRangeUpdate.to_u8()
} else {
Self::Receipts.to_u8()
@@ -634,6 +672,8 @@ impl Decodable for EthMessageID {
0x0f => Self::GetReceipts,
0x10 => Self::Receipts,
0x11 => Self::BlockRangeUpdate,
0x12 => Self::GetBlockAccessLists,
0x13 => Self::BlockAccessLists,
unknown => Self::Other(*unknown),
};
buf.advance(1);
@@ -662,6 +702,8 @@ impl TryFrom<usize> for EthMessageID {
0x0f => Ok(Self::GetReceipts),
0x10 => Ok(Self::Receipts),
0x11 => Ok(Self::BlockRangeUpdate),
0x12 => Ok(Self::GetBlockAccessLists),
0x13 => Ok(Self::BlockAccessLists),
_ => Err("Invalid message ID"),
}
}
@@ -742,8 +784,9 @@ where
mod tests {
use super::MessageError;
use crate::{
message::RequestPair, EthMessage, EthMessageID, EthNetworkPrimitives, EthVersion,
GetNodeData, NodeData, ProtocolMessage, RawCapabilityMessage,
message::RequestPair, BlockAccessLists, EthMessage, EthMessageID, EthNetworkPrimitives,
EthVersion, GetBlockAccessLists, GetNodeData, NodeData, ProtocolMessage,
RawCapabilityMessage,
};
use alloy_primitives::hex;
use alloy_rlp::{Decodable, Encodable, Error};
@@ -784,6 +827,60 @@ mod tests {
assert!(matches!(msg, Err(MessageError::Invalid(..))));
}
#[test]
fn test_bal_message_version_gating() {
let get_block_access_lists =
EthMessage::<EthNetworkPrimitives>::GetBlockAccessLists(RequestPair {
request_id: 1337,
message: GetBlockAccessLists(vec![]),
});
let buf = encode(ProtocolMessage {
message_type: EthMessageID::GetBlockAccessLists,
message: get_block_access_lists,
});
let msg = ProtocolMessage::<EthNetworkPrimitives>::decode_message(
EthVersion::Eth70,
&mut &buf[..],
);
assert!(matches!(
msg,
Err(MessageError::Invalid(EthVersion::Eth70, EthMessageID::GetBlockAccessLists))
));
let block_access_lists =
EthMessage::<EthNetworkPrimitives>::BlockAccessLists(RequestPair {
request_id: 1337,
message: BlockAccessLists(vec![]),
});
let buf = encode(ProtocolMessage {
message_type: EthMessageID::BlockAccessLists,
message: block_access_lists,
});
let msg = ProtocolMessage::<EthNetworkPrimitives>::decode_message(
EthVersion::Eth70,
&mut &buf[..],
);
assert!(matches!(
msg,
Err(MessageError::Invalid(EthVersion::Eth70, EthMessageID::BlockAccessLists))
));
}
#[test]
fn test_bal_message_eth71_roundtrip() {
let msg = ProtocolMessage::from(EthMessage::<EthNetworkPrimitives>::GetBlockAccessLists(
RequestPair { request_id: 42, message: GetBlockAccessLists(vec![]) },
));
let encoded = encode(msg.clone());
let decoded = ProtocolMessage::<EthNetworkPrimitives>::decode_message(
EthVersion::Eth71,
&mut &encoded[..],
)
.unwrap();
assert_eq!(decoded, msg);
}
#[test]
fn request_pair_encode() {
let request_pair = RequestPair { request_id: 1337, message: vec![5u8] };

View File

@@ -29,6 +29,8 @@ pub enum EthVersion {
Eth69 = 69,
/// The `eth` protocol version 70.
Eth70 = 70,
/// The `eth` protocol version 71.
Eth71 = 71,
}
impl EthVersion {
@@ -62,9 +64,19 @@ impl EthVersion {
pub const fn is_eth70(&self) -> bool {
matches!(self, Self::Eth70)
}
/// Returns true if the version is eth/71
pub const fn is_eth71(&self) -> bool {
matches!(self, Self::Eth71)
}
/// Returns true if the version is eth/69 or newer.
pub const fn is_eth69_or_newer(&self) -> bool {
matches!(self, Self::Eth69 | Self::Eth70 | Self::Eth71)
}
}
/// RLP encodes `EthVersion` as a single byte (66-69).
/// RLP encodes `EthVersion` as a single byte (66-71).
impl Encodable for EthVersion {
fn encode(&self, out: &mut dyn BufMut) {
(*self as u8).encode(out)
@@ -76,7 +88,7 @@ impl Encodable for EthVersion {
}
/// RLP decodes a single byte into `EthVersion`.
/// Returns error if byte is not a valid version (66-69).
/// Returns error if byte is not a valid version (66-71).
impl Decodable for EthVersion {
fn decode(buf: &mut &[u8]) -> alloy_rlp::Result<Self> {
let version = u8::decode(buf)?;
@@ -104,6 +116,7 @@ impl TryFrom<&str> for EthVersion {
"68" => Ok(Self::Eth68),
"69" => Ok(Self::Eth69),
"70" => Ok(Self::Eth70),
"71" => Ok(Self::Eth71),
_ => Err(ParseVersionError(s.to_string())),
}
}
@@ -129,6 +142,7 @@ impl TryFrom<u8> for EthVersion {
68 => Ok(Self::Eth68),
69 => Ok(Self::Eth69),
70 => Ok(Self::Eth70),
71 => Ok(Self::Eth71),
_ => Err(ParseVersionError(u.to_string())),
}
}
@@ -159,6 +173,7 @@ impl From<EthVersion> for &'static str {
EthVersion::Eth68 => "68",
EthVersion::Eth69 => "69",
EthVersion::Eth70 => "70",
EthVersion::Eth71 => "71",
}
}
}
@@ -216,6 +231,7 @@ mod tests {
assert_eq!(EthVersion::Eth68, EthVersion::try_from("68").unwrap());
assert_eq!(EthVersion::Eth69, EthVersion::try_from("69").unwrap());
assert_eq!(EthVersion::Eth70, EthVersion::try_from("70").unwrap());
assert_eq!(EthVersion::Eth71, EthVersion::try_from("71").unwrap());
}
#[test]
@@ -225,6 +241,7 @@ mod tests {
assert_eq!(EthVersion::Eth68, "68".parse().unwrap());
assert_eq!(EthVersion::Eth69, "69".parse().unwrap());
assert_eq!(EthVersion::Eth70, "70".parse().unwrap());
assert_eq!(EthVersion::Eth71, "71".parse().unwrap());
}
#[test]
@@ -235,6 +252,7 @@ mod tests {
EthVersion::Eth68,
EthVersion::Eth69,
EthVersion::Eth70,
EthVersion::Eth71,
];
for version in versions {
@@ -253,6 +271,7 @@ mod tests {
(68_u8, Ok(EthVersion::Eth68)),
(69_u8, Ok(EthVersion::Eth69)),
(70_u8, Ok(EthVersion::Eth70)),
(71_u8, Ok(EthVersion::Eth71)),
(65_u8, Err(RlpError::Custom("invalid eth version"))),
];

View File

@@ -294,7 +294,8 @@ mod tests {
use alloy_primitives::B256;
use alloy_rlp::Encodable;
use reth_eth_wire_types::{
message::RequestPair, GetAccountRangeMessage, GetBlockHeaders, HeadersDirection,
message::RequestPair, GetAccountRangeMessage, GetBlockAccessLists, GetBlockHeaders,
HeadersDirection,
};
// Helper to create eth message and its bytes
@@ -419,4 +420,40 @@ mod tests {
let snap_boundary_result = inner.decode_message(snap_boundary_bytes);
assert!(snap_boundary_result.is_err());
}
#[test]
fn test_eth70_message_id_0x12_is_snap() {
let inner = EthSnapStreamInner::<EthNetworkPrimitives>::new(EthVersion::Eth70);
let snap_msg = SnapProtocolMessage::GetAccountRange(GetAccountRangeMessage {
request_id: 1,
root_hash: B256::default(),
starting_hash: B256::default(),
limit_hash: B256::default(),
response_bytes: 1000,
});
let encoded = inner.encode_snap_message(snap_msg);
assert_eq!(encoded[0], EthMessageID::message_count(EthVersion::Eth70));
let decoded = inner.decode_message(BytesMut::from(&encoded[..])).unwrap();
assert!(matches!(decoded, EthSnapMessage::Snap(_)));
}
#[test]
fn test_eth71_message_id_0x12_is_eth() {
let inner = EthSnapStreamInner::<EthNetworkPrimitives>::new(EthVersion::Eth71);
let eth_msg = EthMessage::<EthNetworkPrimitives>::GetBlockAccessLists(RequestPair {
request_id: 1,
message: GetBlockAccessLists(vec![B256::ZERO]),
});
let protocol_msg = ProtocolMessage::from(eth_msg.clone());
let mut buf = Vec::new();
protocol_msg.encode(&mut buf);
let decoded = inner.decode_message(BytesMut::from(&buf[..])).unwrap();
let EthSnapMessage::Eth(decoded_eth) = decoded else {
panic!("expected eth message");
};
assert_eq!(decoded_eth, eth_msg);
}
}

View File

@@ -84,5 +84,7 @@ mod tests {
assert_eq!(Protocol::eth(EthVersion::Eth67).messages(), 17);
assert_eq!(Protocol::eth(EthVersion::Eth68).messages(), 17);
assert_eq!(Protocol::eth(EthVersion::Eth69).messages(), 18);
assert_eq!(Protocol::eth(EthVersion::Eth70).messages(), 18);
assert_eq!(Protocol::eth(EthVersion::Eth71).messages(), 20);
}
}

View File

@@ -1,10 +1,11 @@
//! API related to listening for network events.
use reth_eth_wire_types::{
message::RequestPair, BlockBodies, BlockHeaders, Capabilities, DisconnectReason, EthMessage,
EthNetworkPrimitives, EthVersion, GetBlockBodies, GetBlockHeaders, GetNodeData,
GetPooledTransactions, GetReceipts, GetReceipts70, NetworkPrimitives, NodeData,
PooledTransactions, Receipts, Receipts69, Receipts70, UnifiedStatus,
message::RequestPair, BlockAccessLists, BlockBodies, BlockHeaders, Capabilities,
DisconnectReason, EthMessage, EthNetworkPrimitives, EthVersion, GetBlockAccessLists,
GetBlockBodies, GetBlockHeaders, GetNodeData, GetPooledTransactions, GetReceipts,
GetReceipts70, NetworkPrimitives, NodeData, PooledTransactions, Receipts, Receipts69,
Receipts70, UnifiedStatus,
};
use reth_ethereum_forks::ForkId;
use reth_network_p2p::error::{RequestError, RequestResult};
@@ -252,6 +253,15 @@ pub enum PeerRequest<N: NetworkPrimitives = EthNetworkPrimitives> {
/// The channel to send the response for receipts.
response: oneshot::Sender<RequestResult<Receipts70<N::Receipt>>>,
},
/// Requests block access lists from the peer.
///
/// The response should be sent through the channel.
GetBlockAccessLists {
/// The request for block access lists.
request: GetBlockAccessLists,
/// The channel to send the response for block access lists.
response: oneshot::Sender<RequestResult<BlockAccessLists>>,
},
}
// === impl PeerRequest ===
@@ -272,9 +282,19 @@ impl<N: NetworkPrimitives> PeerRequest<N> {
Self::GetReceipts { response, .. } => response.send(Err(err)).ok(),
Self::GetReceipts69 { response, .. } => response.send(Err(err)).ok(),
Self::GetReceipts70 { response, .. } => response.send(Err(err)).ok(),
Self::GetBlockAccessLists { response, .. } => response.send(Err(err)).ok(),
};
}
/// Returns true if this request is supported for the negotiated eth protocol version.
#[inline]
pub fn is_supported_by_eth_version(&self, version: EthVersion) -> bool {
match self {
Self::GetBlockAccessLists { .. } => version >= EthVersion::Eth71,
_ => true,
}
}
/// Returns the [`EthMessage`] for this type
pub fn create_request_message(&self, request_id: u64) -> EthMessage<N> {
match self {
@@ -299,6 +319,12 @@ impl<N: NetworkPrimitives> PeerRequest<N> {
Self::GetReceipts70 { request, .. } => {
EthMessage::GetReceipts70(RequestPair { request_id, message: request.clone() })
}
Self::GetBlockAccessLists { request, .. } => {
EthMessage::GetBlockAccessLists(RequestPair {
request_id,
message: request.clone(),
})
}
}
}
@@ -349,3 +375,18 @@ impl<R> fmt::Debug for PeerRequestSender<R> {
f.debug_struct("PeerRequestSender").field("peer_id", &self.peer_id).finish_non_exhaustive()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_get_block_access_lists_version_support() {
let (tx, _rx) = oneshot::channel();
let req: PeerRequest<EthNetworkPrimitives> =
PeerRequest::GetBlockAccessLists { request: GetBlockAccessLists(vec![]), response: tx };
assert!(!req.is_supported_by_eth_version(EthVersion::Eth70));
assert!(req.is_supported_by_eth_version(EthVersion::Eth71));
}
}

View File

@@ -6,12 +6,13 @@ use crate::{
};
use alloy_consensus::{BlockHeader, ReceiptWithBloom};
use alloy_eips::BlockHashOrNumber;
use alloy_primitives::Bytes;
use alloy_rlp::Encodable;
use futures::StreamExt;
use reth_eth_wire::{
BlockBodies, BlockHeaders, EthNetworkPrimitives, GetBlockBodies, GetBlockHeaders, GetNodeData,
GetReceipts, GetReceipts70, HeadersDirection, NetworkPrimitives, NodeData, Receipts,
Receipts69, Receipts70,
BlockAccessLists, BlockBodies, BlockHeaders, EthNetworkPrimitives, GetBlockAccessLists,
GetBlockBodies, GetBlockHeaders, GetNodeData, GetReceipts, GetReceipts70, HeadersDirection,
NetworkPrimitives, NodeData, Receipts, Receipts69, Receipts70,
};
use reth_network_api::test_utils::PeersHandle;
use reth_network_p2p::error::RequestResult;
@@ -281,6 +282,19 @@ where
let _ = response.send(Ok(Receipts70 { last_block_incomplete, receipts }));
}
/// Handles [`GetBlockAccessLists`] queries.
///
/// For now this returns one empty BAL per requested hash.
fn on_block_access_lists_request(
&self,
_peer_id: PeerId,
request: GetBlockAccessLists,
response: oneshot::Sender<RequestResult<BlockAccessLists>>,
) {
let access_lists = request.0.into_iter().map(|_| Bytes::new()).collect();
let _ = response.send(Ok(BlockAccessLists(access_lists)));
}
#[inline]
fn get_receipts_response<T, F>(&self, request: GetReceipts, transform_fn: F) -> Vec<Vec<T>>
where
@@ -352,6 +366,9 @@ where
IncomingEthRequest::GetReceipts70 { peer_id, request, response } => {
this.on_receipts70_request(peer_id, request, response)
}
IncomingEthRequest::GetBlockAccessLists { peer_id, request, response } => {
this.on_block_access_lists_request(peer_id, request, response)
}
}
},
);
@@ -437,4 +454,15 @@ pub enum IncomingEthRequest<N: NetworkPrimitives = EthNetworkPrimitives> {
/// The channel sender for the response containing Receipts70.
response: oneshot::Sender<RequestResult<Receipts70<N::Receipt>>>,
},
/// Request Block Access Lists from the peer.
///
/// The response should be sent through the channel.
GetBlockAccessLists {
/// The ID of the peer to request block access lists from.
peer_id: PeerId,
/// The requested block hashes.
request: GetBlockAccessLists,
/// The channel sender for the response containing block access lists.
response: oneshot::Sender<RequestResult<BlockAccessLists>>,
},
}

View File

@@ -551,6 +551,13 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
response,
})
}
PeerRequest::GetBlockAccessLists { request, response } => {
self.delegate_eth_request(IncomingEthRequest::GetBlockAccessLists {
peer_id,
request,
response,
})
}
PeerRequest::GetPooledTransactions { request, response } => {
self.notify_tx_manager(NetworkTransactionEvent::GetPooledTransactions {
peer_id,

View File

@@ -3,7 +3,7 @@
//! An `RLPx` stream is multiplexed via the prepended message-id of a framed message.
//! Capabilities are exchanged via the `RLPx` `Hello` message as pairs of `(id, version)`, <https://github.com/ethereum/devp2p/blob/master/rlpx.md#capability-messaging>
use crate::types::{Receipts69, Receipts70};
use crate::types::{BlockAccessLists, Receipts69, Receipts70};
use alloy_consensus::{BlockHeader, ReceiptWithBloom};
use alloy_primitives::{Bytes, B256};
use futures::FutureExt;
@@ -121,6 +121,11 @@ pub enum PeerResponse<N: NetworkPrimitives = EthNetworkPrimitives> {
/// The receiver channel for the response to a receipts request.
response: oneshot::Receiver<RequestResult<Receipts70<N::Receipt>>>,
},
/// Represents a response to a request for block access lists.
BlockAccessLists {
/// The receiver channel for the response to a block access lists request.
response: oneshot::Receiver<RequestResult<BlockAccessLists>>,
},
}
// === impl PeerResponse ===
@@ -160,6 +165,10 @@ impl<N: NetworkPrimitives> PeerResponse<N> {
Ok(res) => PeerResponseResult::Receipts70(res),
Err(err) => PeerResponseResult::Receipts70(Err(err.into())),
},
Self::BlockAccessLists { response } => match ready!(response.poll_unpin(cx)) {
Ok(res) => PeerResponseResult::BlockAccessLists(res),
Err(err) => PeerResponseResult::BlockAccessLists(Err(err.into())),
},
};
Poll::Ready(res)
}
@@ -182,6 +191,8 @@ pub enum PeerResponseResult<N: NetworkPrimitives = EthNetworkPrimitives> {
Receipts69(RequestResult<Vec<Vec<N::Receipt>>>),
/// Represents a result containing receipts or an error for eth/70.
Receipts70(RequestResult<Receipts70<N::Receipt>>),
/// Represents a result containing block access lists or an error.
BlockAccessLists(RequestResult<BlockAccessLists>),
}
// === impl PeerResponseResult ===
@@ -226,6 +237,13 @@ impl<N: NetworkPrimitives> PeerResponseResult<N> {
}
Err(err) => Err(err),
},
Self::BlockAccessLists(resp) => match resp {
Ok(res) => {
let request = RequestPair { request_id: id, message: res };
Ok(EthMessage::BlockAccessLists(request))
}
Err(err) => Err(err),
},
}
}
@@ -239,6 +257,7 @@ impl<N: NetworkPrimitives> PeerResponseResult<N> {
Self::Receipts(res) => res.as_ref().err(),
Self::Receipts69(res) => res.as_ref().err(),
Self::Receipts70(res) => res.as_ref().err(),
Self::BlockAccessLists(res) => res.as_ref().err(),
}
}

View File

@@ -282,6 +282,12 @@ impl<N: NetworkPrimitives> ActiveSession<N> {
EthMessage::Receipts70(resp) => {
on_response!(resp, GetReceipts70)
}
EthMessage::GetBlockAccessLists(req) => {
on_request!(req, BlockAccessLists, GetBlockAccessLists)
}
EthMessage::BlockAccessLists(resp) => {
on_response!(resp, GetBlockAccessLists)
}
EthMessage::BlockRangeUpdate(msg) => {
// Validate that earliest <= latest according to the spec
if msg.earliest > msg.latest {
@@ -316,9 +322,22 @@ impl<N: NetworkPrimitives> ActiveSession<N> {
/// Handle an internal peer request that will be sent to the remote.
fn on_internal_peer_request(&mut self, request: PeerRequest<N>, deadline: Instant) {
let version = self.conn.version();
if !Self::is_request_supported_for_version(&request, version) {
debug!(
target: "net",
?request,
peer_id=?self.remote_peer_id,
?version,
"Request not supported for negotiated eth version",
);
request.send_err_response(RequestError::UnsupportedCapability);
return;
}
let request_id = self.next_id();
trace!(?request, peer_id=?self.remote_peer_id, ?request_id, "sending request to peer");
let msg = request.create_request_message(request_id).map_versioned(self.conn.version());
let msg = request.create_request_message(request_id).map_versioned(version);
self.queued_outgoing.push_back(msg.into());
let req = InflightRequest {
@@ -329,6 +348,11 @@ impl<N: NetworkPrimitives> ActiveSession<N> {
self.inflight_requests.insert(request_id, req);
}
#[inline]
fn is_request_supported_for_version(request: &PeerRequest<N>, version: EthVersion) -> bool {
request.is_supported_by_eth_version(version)
}
/// Handle a message received from the internal network
fn on_internal_peer_message(&mut self, msg: PeerMessage<N>) {
match msg {
@@ -938,9 +962,9 @@ mod tests {
use reth_chainspec::MAINNET;
use reth_ecies::stream::ECIESStream;
use reth_eth_wire::{
handshake::EthHandshake, EthNetworkPrimitives, EthStream, GetBlockBodies,
HelloMessageWithProtocols, P2PStream, StatusBuilder, UnauthedEthStream, UnauthedP2PStream,
UnifiedStatus,
handshake::EthHandshake, EthNetworkPrimitives, EthStream, GetBlockAccessLists,
GetBlockBodies, HelloMessageWithProtocols, P2PStream, StatusBuilder, UnauthedEthStream,
UnauthedP2PStream, UnifiedStatus,
};
use reth_ethereum_forks::EthereumHardfork;
use reth_network_peers::pk2id;
@@ -1240,6 +1264,22 @@ mod tests {
}
}
#[test]
fn test_reject_bal_request_for_eth70() {
let (tx, _rx) = oneshot::channel();
let request: PeerRequest<EthNetworkPrimitives> =
PeerRequest::GetBlockAccessLists { request: GetBlockAccessLists(vec![]), response: tx };
assert!(!ActiveSession::<EthNetworkPrimitives>::is_request_supported_for_version(
&request,
EthVersion::Eth70
));
assert!(ActiveSession::<EthNetworkPrimitives>::is_request_supported_for_version(
&request,
EthVersion::Eth71
));
}
#[tokio::test(flavor = "multi_thread")]
async fn test_keep_alive() {
let mut builder = SessionBuilder::default();

View File

@@ -1949,7 +1949,7 @@ impl PooledTransactionsHashesBuilder {
fn new(version: EthVersion) -> Self {
match version {
EthVersion::Eth66 | EthVersion::Eth67 => Self::Eth66(Default::default()),
EthVersion::Eth68 | EthVersion::Eth69 | EthVersion::Eth70 => {
EthVersion::Eth68 | EthVersion::Eth69 | EthVersion::Eth70 | EthVersion::Eth71 => {
Self::Eth68(Default::default())
}
}

View File

@@ -83,6 +83,7 @@ async fn main() -> eyre::Result<()> {
IncomingEthRequest::GetReceipts { .. } => {}
IncomingEthRequest::GetReceipts69 { .. } => {}
IncomingEthRequest::GetReceipts70 { .. } => {}
IncomingEthRequest::GetBlockAccessLists { .. } => {}
}
}
transaction_message = transactions_rx.recv() => {