feat: NetworkPrimitives (#12435)

This commit is contained in:
Arsenii Kulikov
2024-11-11 14:59:41 +04:00
committed by GitHub
parent b893a8879d
commit 365f6a1f69
10 changed files with 226 additions and 113 deletions

View File

@@ -5,8 +5,7 @@ use crate::HeadersDirection;
use alloy_eips::BlockHashOrNumber;
use alloy_primitives::B256;
use alloy_rlp::{RlpDecodable, RlpDecodableWrapper, RlpEncodable, RlpEncodableWrapper};
use reth_codecs_derive::add_arbitrary_tests;
use reth_primitives::{BlockBody, Header};
use reth_codecs_derive::{add_arbitrary_tests, generate_tests};
/// A request for a peer to return block headers starting at the requested block.
/// The peer must return at most [`limit`](#structfield.limit) headers.
@@ -41,34 +40,16 @@ pub struct GetBlockHeaders {
/// The response to [`GetBlockHeaders`], containing headers if any headers were found.
#[derive(Clone, Debug, PartialEq, Eq, RlpEncodableWrapper, RlpDecodableWrapper, Default)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[add_arbitrary_tests(rlp, 10)]
pub struct BlockHeaders(
#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
pub struct BlockHeaders<H = reth_primitives::Header>(
/// The requested headers.
pub Vec<Header>,
pub Vec<H>,
);
#[cfg(any(test, feature = "arbitrary"))]
impl<'a> arbitrary::Arbitrary<'a> for BlockHeaders {
fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
let headers_count: usize = u.int_in_range(0..=10)?;
let mut headers = Vec::with_capacity(headers_count);
generate_tests!(#[rlp, 10] BlockHeaders<reth_primitives::Header>, EthBlockHeadersTests);
for _ in 0..headers_count {
headers.push(reth_primitives::generate_valid_header(
u.arbitrary()?,
u.arbitrary()?,
u.arbitrary()?,
u.arbitrary()?,
u.arbitrary()?,
))
}
Ok(Self(headers))
}
}
impl From<Vec<Header>> for BlockHeaders {
fn from(headers: Vec<Header>) -> Self {
impl<H> From<Vec<H>> for BlockHeaders<H> {
fn from(headers: Vec<H>) -> Self {
Self(headers)
}
}
@@ -94,14 +75,15 @@ impl From<Vec<B256>> for GetBlockBodies {
#[derive(Clone, Debug, PartialEq, Eq, RlpEncodableWrapper, RlpDecodableWrapper, Default)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
#[add_arbitrary_tests(rlp, 16)]
pub struct BlockBodies(
pub struct BlockBodies<B = reth_primitives::BlockBody>(
/// The requested block bodies, each of which should correspond to a hash in the request.
pub Vec<BlockBody>,
pub Vec<B>,
);
impl From<Vec<BlockBody>> for BlockBodies {
fn from(bodies: Vec<BlockBody>) -> Self {
generate_tests!(#[rlp, 16] BlockBodies<reth_primitives::BlockBody>, EthBlockBodiesTests);
impl<B> From<Vec<B>> for BlockBodies<B> {
fn from(bodies: Vec<B>) -> Self {
Self(bodies)
}
}
@@ -116,11 +98,9 @@ mod tests {
use alloy_eips::BlockHashOrNumber;
use alloy_primitives::{hex, PrimitiveSignature as Signature, TxKind, U256};
use alloy_rlp::{Decodable, Encodable};
use reth_primitives::{Header, Transaction, TransactionSigned};
use reth_primitives::{BlockBody, Header, Transaction, TransactionSigned};
use std::str::FromStr;
use super::BlockBody;
#[test]
fn decode_hash() {
// this is a valid 32 byte rlp string
@@ -254,7 +234,7 @@ mod tests {
// [ (f90202) 0x0457 = 1111, [ (f901fc) [ (f901f9) header ] ] ]
let expected = hex!("f90202820457f901fcf901f9a00000000000000000000000000000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000940000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000b90100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000008208ae820d0582115c8215b3821a0a827788a00000000000000000000000000000000000000000000000000000000000000000880000000000000000");
let mut data = vec![];
RequestPair::<BlockHeaders> {
RequestPair::<BlockHeaders<_>> {
request_id: 1111,
message: BlockHeaders(vec![
Header {
@@ -289,7 +269,7 @@ mod tests {
#[test]
fn decode_block_header() {
let data = hex!("f90202820457f901fcf901f9a00000000000000000000000000000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000940000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000b90100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000008208ae820d0582115c8215b3821a0a827788a00000000000000000000000000000000000000000000000000000000000000000880000000000000000");
let expected = RequestPair::<BlockHeaders> {
let expected = RequestPair::<BlockHeaders<_>> {
request_id: 1111,
message: BlockHeaders(vec![
Header {
@@ -357,7 +337,7 @@ mod tests {
fn encode_block_bodies() {
let expected = hex!("f902dc820457f902d6f902d3f8d2f867088504a817c8088302e2489435353535353535353535353535353535353535358202008025a064b1702d9298fee62dfeccc57d322a463ad55ca201256d01f62b45b2e1c21c12a064b1702d9298fee62dfeccc57d322a463ad55ca201256d01f62b45b2e1c21c10f867098504a817c809830334509435353535353535353535353535353535353535358202d98025a052f8f61201b2b11a78d6e866abc9c3db2ae8631fa656bfe5cb53668255367afba052f8f61201b2b11a78d6e866abc9c3db2ae8631fa656bfe5cb53668255367afbf901fcf901f9a00000000000000000000000000000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000940000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000b90100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000008208ae820d0582115c8215b3821a0a827788a00000000000000000000000000000000000000000000000000000000000000000880000000000000000");
let mut data = vec![];
let request = RequestPair::<BlockBodies> {
let request = RequestPair::<BlockBodies<_>> {
request_id: 1111,
message: BlockBodies(vec![
BlockBody {
@@ -428,7 +408,7 @@ mod tests {
#[test]
fn decode_block_bodies() {
let data = hex!("f902dc820457f902d6f902d3f8d2f867088504a817c8088302e2489435353535353535353535353535353535353535358202008025a064b1702d9298fee62dfeccc57d322a463ad55ca201256d01f62b45b2e1c21c12a064b1702d9298fee62dfeccc57d322a463ad55ca201256d01f62b45b2e1c21c10f867098504a817c809830334509435353535353535353535353535353535353535358202d98025a052f8f61201b2b11a78d6e866abc9c3db2ae8631fa656bfe5cb53668255367afba052f8f61201b2b11a78d6e866abc9c3db2ae8631fa656bfe5cb53668255367afbf901fcf901f9a00000000000000000000000000000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000940000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000b90100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000008208ae820d0582115c8215b3821a0a827788a00000000000000000000000000000000000000000000000000000000000000000880000000000000000");
let expected = RequestPair::<BlockBodies> {
let expected = RequestPair::<BlockBodies<_>> {
request_id: 1111,
message: BlockBodies(vec![
BlockBody {
@@ -504,7 +484,7 @@ mod tests {
let body = BlockBodies::default();
let mut buf = Vec::new();
body.encode(&mut buf);
let decoded = BlockBodies::decode(&mut buf.as_slice()).unwrap();
let decoded = BlockBodies::<BlockBody>::decode(&mut buf.as_slice()).unwrap();
assert_eq!(body, decoded);
}
}

View File

@@ -1,14 +1,14 @@
//! Types for broadcasting new data.
use crate::{EthMessage, EthVersion};
use crate::{EthMessage, EthVersion, NetworkPrimitives};
use alloy_rlp::{
Decodable, Encodable, RlpDecodable, RlpDecodableWrapper, RlpEncodable, RlpEncodableWrapper,
};
use alloy_primitives::{Bytes, TxHash, B256, U128};
use derive_more::{Constructor, Deref, DerefMut, From, IntoIterator};
use reth_codecs_derive::add_arbitrary_tests;
use reth_primitives::{Block, PooledTransactionsElement, TransactionSigned};
use reth_codecs_derive::{add_arbitrary_tests, generate_tests};
use reth_primitives::{PooledTransactionsElement, TransactionSigned};
use std::{
collections::{HashMap, HashSet},
@@ -75,14 +75,15 @@ impl From<NewBlockHashes> for Vec<BlockHashNumber> {
#[derive(Clone, Debug, PartialEq, Eq, RlpEncodable, RlpDecodable, Default)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
#[add_arbitrary_tests(rlp, 25)]
pub struct NewBlock {
pub struct NewBlock<B = reth_primitives::Block> {
/// A new block.
pub block: Block,
pub block: B,
/// The current total difficulty.
pub td: U128,
}
generate_tests!(#[rlp, 25] NewBlock<reth_primitives::Block>, EthNewBlockTests);
/// This informs peers of transactions that have appeared on the network and are not yet included
/// in a block.
#[derive(Clone, Debug, PartialEq, Eq, RlpEncodableWrapper, RlpDecodableWrapper, Default)]
@@ -269,7 +270,7 @@ impl NewPooledTransactionHashes {
}
}
impl From<NewPooledTransactionHashes> for EthMessage {
impl<N: NetworkPrimitives> From<NewPooledTransactionHashes> for EthMessage<N> {
fn from(value: NewPooledTransactionHashes) -> Self {
match value {
NewPooledTransactionHashes::Eth66(msg) => Self::NewPooledTransactionHashes66(msg),

View File

@@ -40,3 +40,6 @@ pub use disconnect_reason::*;
pub mod capability;
pub use capability::*;
pub mod primitives;
pub use primitives::*;

View File

@@ -11,7 +11,7 @@ use super::{
GetNodeData, GetPooledTransactions, GetReceipts, NewBlock, NewPooledTransactionHashes66,
NewPooledTransactionHashes68, NodeData, PooledTransactions, Receipts, Status, Transactions,
};
use crate::{EthVersion, SharedTransactions};
use crate::{EthNetworkPrimitives, EthVersion, NetworkPrimitives, SharedTransactions};
use alloy_primitives::bytes::{Buf, BufMut};
use alloy_rlp::{length_of_length, Decodable, Encodable, Header};
@@ -35,14 +35,18 @@ pub enum MessageError {
/// An `eth` protocol message, containing a message ID and payload.
#[derive(Clone, Debug, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct ProtocolMessage {
pub struct ProtocolMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
/// The unique identifier representing the type of the Ethereum message.
pub message_type: EthMessageID,
/// The content of the message, including specific data based on the message type.
pub message: EthMessage,
#[cfg_attr(
feature = "serde",
serde(bound = "EthMessage<N>: serde::Serialize + serde::de::DeserializeOwned")
)]
pub message: EthMessage<N>,
}
impl ProtocolMessage {
impl<N: NetworkPrimitives> ProtocolMessage<N> {
/// Create a new `ProtocolMessage` from a message type and message rlp bytes.
pub fn decode_message(version: EthVersion, buf: &mut &[u8]) -> Result<Self, MessageError> {
let message_type = EthMessageID::decode(buf)?;
@@ -78,7 +82,7 @@ impl ProtocolMessage {
EthMessage::GetBlockHeaders(request_pair)
}
EthMessageID::BlockHeaders => {
let request_pair = RequestPair::<BlockHeaders>::decode(buf)?;
let request_pair = RequestPair::<BlockHeaders<_>>::decode(buf)?;
EthMessage::BlockHeaders(request_pair)
}
EthMessageID::GetBlockBodies => {
@@ -86,7 +90,7 @@ impl ProtocolMessage {
EthMessage::GetBlockBodies(request_pair)
}
EthMessageID::BlockBodies => {
let request_pair = RequestPair::<BlockBodies>::decode(buf)?;
let request_pair = RequestPair::<BlockBodies<_>>::decode(buf)?;
EthMessage::BlockBodies(request_pair)
}
EthMessageID::GetPooledTransactions => {
@@ -124,7 +128,7 @@ impl ProtocolMessage {
}
}
impl Encodable for ProtocolMessage {
impl<N: NetworkPrimitives> Encodable for ProtocolMessage<N> {
/// Encodes the protocol message into bytes. The message type is encoded as a single byte and
/// prepended to the message.
fn encode(&self, out: &mut dyn BufMut) {
@@ -136,23 +140,23 @@ impl Encodable for ProtocolMessage {
}
}
impl From<EthMessage> for ProtocolMessage {
fn from(message: EthMessage) -> Self {
impl<N: NetworkPrimitives> From<EthMessage<N>> for ProtocolMessage<N> {
fn from(message: EthMessage<N>) -> Self {
Self { message_type: message.message_id(), message }
}
}
/// Represents messages that can be sent to multiple peers.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ProtocolBroadcastMessage {
#[derive(Clone, Debug)]
pub struct ProtocolBroadcastMessage<N: NetworkPrimitives> {
/// The unique identifier representing the type of the Ethereum message.
pub message_type: EthMessageID,
/// The content of the message to be broadcasted, including specific data based on the message
/// type.
pub message: EthBroadcastMessage,
pub message: EthBroadcastMessage<N>,
}
impl Encodable for ProtocolBroadcastMessage {
impl<N: NetworkPrimitives> Encodable for ProtocolBroadcastMessage<N> {
/// Encodes the protocol message into bytes. The message type is encoded as a single byte and
/// prepended to the message.
fn encode(&self, out: &mut dyn BufMut) {
@@ -164,8 +168,8 @@ impl Encodable for ProtocolBroadcastMessage {
}
}
impl From<EthBroadcastMessage> for ProtocolBroadcastMessage {
fn from(message: EthBroadcastMessage) -> Self {
impl<N: NetworkPrimitives> From<EthBroadcastMessage<N>> for ProtocolBroadcastMessage<N> {
fn from(message: EthBroadcastMessage<N>) -> Self {
Self { message_type: message.message_id(), message }
}
}
@@ -189,13 +193,17 @@ impl From<EthBroadcastMessage> for ProtocolBroadcastMessage {
/// [`NewPooledTransactionHashes68`] is defined.
#[derive(Clone, Debug, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub enum EthMessage {
pub enum EthMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
/// Represents a Status message required for the protocol handshake.
Status(Status),
/// Represents a `NewBlockHashes` message broadcast to the network.
NewBlockHashes(NewBlockHashes),
/// Represents a `NewBlock` message broadcast to the network.
NewBlock(Box<NewBlock>),
#[cfg_attr(
feature = "serde",
serde(bound = "N::Block: serde::Serialize + serde::de::DeserializeOwned")
)]
NewBlock(Box<NewBlock<N::Block>>),
/// Represents a Transactions message broadcast to the network.
Transactions(Transactions),
/// Represents a `NewPooledTransactionHashes` message for eth/66 version.
@@ -206,11 +214,19 @@ pub enum EthMessage {
/// Represents a `GetBlockHeaders` request-response pair.
GetBlockHeaders(RequestPair<GetBlockHeaders>),
/// Represents a `BlockHeaders` request-response pair.
BlockHeaders(RequestPair<BlockHeaders>),
#[cfg_attr(
feature = "serde",
serde(bound = "N::BlockHeader: serde::Serialize + serde::de::DeserializeOwned")
)]
BlockHeaders(RequestPair<BlockHeaders<N::BlockHeader>>),
/// Represents a `GetBlockBodies` request-response pair.
GetBlockBodies(RequestPair<GetBlockBodies>),
/// Represents a `BlockBodies` request-response pair.
BlockBodies(RequestPair<BlockBodies>),
#[cfg_attr(
feature = "serde",
serde(bound = "N::BlockBody: serde::Serialize + serde::de::DeserializeOwned")
)]
BlockBodies(RequestPair<BlockBodies<N::BlockBody>>),
/// Represents a `GetPooledTransactions` request-response pair.
GetPooledTransactions(RequestPair<GetPooledTransactions>),
/// Represents a `PooledTransactions` request-response pair.
@@ -225,7 +241,7 @@ pub enum EthMessage {
Receipts(RequestPair<Receipts>),
}
impl EthMessage {
impl<N: NetworkPrimitives> EthMessage<N> {
/// Returns the message's ID.
pub const fn message_id(&self) -> EthMessageID {
match self {
@@ -250,7 +266,7 @@ impl EthMessage {
}
}
impl Encodable for EthMessage {
impl<N: NetworkPrimitives> Encodable for EthMessage<N> {
fn encode(&self, out: &mut dyn BufMut) {
match self {
Self::Status(status) => status.encode(out),
@@ -301,16 +317,16 @@ impl Encodable for EthMessage {
///
/// Note: This is only useful for outgoing messages.
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum EthBroadcastMessage {
pub enum EthBroadcastMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
/// Represents a new block broadcast message.
NewBlock(Arc<NewBlock>),
NewBlock(Arc<NewBlock<N::Block>>),
/// Represents a transactions broadcast message.
Transactions(SharedTransactions),
}
// === impl EthBroadcastMessage ===
impl EthBroadcastMessage {
impl<N: NetworkPrimitives> EthBroadcastMessage<N> {
/// Returns the message's ID.
pub const fn message_id(&self) -> EthMessageID {
match self {
@@ -320,7 +336,7 @@ impl EthBroadcastMessage {
}
}
impl Encodable for EthBroadcastMessage {
impl<N: NetworkPrimitives> Encodable for EthBroadcastMessage<N> {
fn encode(&self, out: &mut dyn BufMut) {
match self {
Self::NewBlock(new_block) => new_block.encode(out),
@@ -502,8 +518,8 @@ where
mod tests {
use super::MessageError;
use crate::{
message::RequestPair, EthMessage, EthMessageID, EthVersion, GetNodeData, NodeData,
ProtocolMessage,
message::RequestPair, EthMessage, EthMessageID, EthNetworkPrimitives, EthVersion,
GetNodeData, NodeData, ProtocolMessage,
};
use alloy_primitives::hex;
use alloy_rlp::{Decodable, Encodable, Error};
@@ -516,20 +532,30 @@ mod tests {
#[test]
fn test_removed_message_at_eth67() {
let get_node_data =
EthMessage::GetNodeData(RequestPair { request_id: 1337, message: GetNodeData(vec![]) });
let get_node_data = EthMessage::<EthNetworkPrimitives>::GetNodeData(RequestPair {
request_id: 1337,
message: GetNodeData(vec![]),
});
let buf = encode(ProtocolMessage {
message_type: EthMessageID::GetNodeData,
message: get_node_data,
});
let msg = ProtocolMessage::decode_message(crate::EthVersion::Eth67, &mut &buf[..]);
let msg = ProtocolMessage::<EthNetworkPrimitives>::decode_message(
crate::EthVersion::Eth67,
&mut &buf[..],
);
assert!(matches!(msg, Err(MessageError::Invalid(..))));
let node_data =
EthMessage::NodeData(RequestPair { request_id: 1337, message: NodeData(vec![]) });
let node_data = EthMessage::<EthNetworkPrimitives>::NodeData(RequestPair {
request_id: 1337,
message: NodeData(vec![]),
});
let buf =
encode(ProtocolMessage { message_type: EthMessageID::NodeData, message: node_data });
let msg = ProtocolMessage::decode_message(crate::EthVersion::Eth67, &mut &buf[..]);
let msg = ProtocolMessage::<EthNetworkPrimitives>::decode_message(
crate::EthVersion::Eth67,
&mut &buf[..],
);
assert!(matches!(msg, Err(MessageError::Invalid(..))));
}
@@ -578,10 +604,11 @@ mod tests {
#[test]
fn empty_block_bodies_protocol() {
let empty_block_bodies = ProtocolMessage::from(EthMessage::BlockBodies(RequestPair {
request_id: 0,
message: Default::default(),
}));
let empty_block_bodies =
ProtocolMessage::from(EthMessage::<EthNetworkPrimitives>::BlockBodies(RequestPair {
request_id: 0,
message: Default::default(),
}));
let mut buf = Vec::new();
empty_block_bodies.encode(&mut buf);
let decoded =

View File

@@ -0,0 +1,83 @@
//! Abstraction over primitive types in network messages.
use std::fmt::Debug;
use alloy_rlp::{Decodable, Encodable};
/// Abstraction over primitive types which might appear in network messages. See
/// [`crate::EthMessage`] for more context.
pub trait NetworkPrimitives:
Send + Sync + Unpin + Clone + Debug + PartialEq + Eq + 'static
{
/// The block header type.
type BlockHeader: Encodable
+ Decodable
+ Send
+ Sync
+ Unpin
+ Clone
+ Debug
+ PartialEq
+ Eq
+ 'static;
/// The block body type.
type BlockBody: Encodable
+ Decodable
+ Send
+ Sync
+ Unpin
+ Clone
+ Debug
+ PartialEq
+ Eq
+ 'static;
/// Full block type.
type Block: Encodable
+ Decodable
+ Send
+ Sync
+ Unpin
+ Clone
+ Debug
+ PartialEq
+ Eq
+ 'static;
/// The transaction type which peers announce in `Transactions` messages. It is different from
/// `PooledTransactions` to account for Ethereum case where EIP-4844 transactions are not being
/// announced and can only be explicitly requested from peers.
type BroadcastedTransaction: Encodable
+ Decodable
+ Send
+ Sync
+ Unpin
+ Clone
+ Debug
+ PartialEq
+ Eq
+ 'static;
/// The transaction type which peers return in `PooledTransactions` messages.
type PooledTransaction: Encodable
+ Decodable
+ Send
+ Sync
+ Unpin
+ Clone
+ Debug
+ PartialEq
+ Eq
+ 'static;
}
/// Primitive types used by Ethereum network.
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub struct EthNetworkPrimitives;
impl NetworkPrimitives for EthNetworkPrimitives {
type BlockHeader = reth_primitives::Header;
type BlockBody = reth_primitives::BlockBody;
type Block = reth_primitives::Block;
type BroadcastedTransaction = reth_primitives::TransactionSigned;
type PooledTransaction = reth_primitives::PooledTransactionsElement;
}

View File

@@ -1,12 +1,11 @@
//! Implements the `GetPooledTransactions` and `PooledTransactions` message types.
use alloy_eips::eip2718::Encodable2718;
use alloy_primitives::B256;
use alloy_rlp::{RlpDecodableWrapper, RlpEncodableWrapper};
use derive_more::{Constructor, Deref, IntoIterator};
use reth_codecs_derive::add_arbitrary_tests;
use reth_primitives::{
transaction::TransactionConversionError, PooledTransactionsElement, TransactionSigned,
};
use reth_primitives::{transaction::TransactionConversionError, PooledTransactionsElement};
/// A list of transaction hashes that the peer would like transaction bodies for.
#[derive(Clone, Debug, PartialEq, Eq, RlpEncodableWrapper, RlpDecodableWrapper, Default)]
@@ -42,38 +41,46 @@ where
Eq,
RlpEncodableWrapper,
RlpDecodableWrapper,
Default,
IntoIterator,
Deref,
Constructor,
)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct PooledTransactions(
pub struct PooledTransactions<T = PooledTransactionsElement>(
/// The transaction bodies, each of which should correspond to a requested hash.
pub Vec<PooledTransactionsElement>,
pub Vec<T>,
);
impl PooledTransactions {
impl<T: Encodable2718> PooledTransactions<T> {
/// Returns an iterator over the transaction hashes in this response.
pub fn hashes(&self) -> impl Iterator<Item = &B256> + '_ {
self.0.iter().map(|tx| tx.hash())
pub fn hashes(&self) -> impl Iterator<Item = B256> + '_ {
self.0.iter().map(|tx| tx.trie_hash())
}
}
impl TryFrom<Vec<TransactionSigned>> for PooledTransactions {
impl<T, U> TryFrom<Vec<U>> for PooledTransactions<T>
where
T: TryFrom<U, Error = TransactionConversionError>,
{
type Error = TransactionConversionError;
fn try_from(txs: Vec<TransactionSigned>) -> Result<Self, Self::Error> {
txs.into_iter().map(PooledTransactionsElement::try_from).collect()
fn try_from(txs: Vec<U>) -> Result<Self, Self::Error> {
txs.into_iter().map(T::try_from).collect()
}
}
impl FromIterator<PooledTransactionsElement> for PooledTransactions {
fn from_iter<I: IntoIterator<Item = PooledTransactionsElement>>(iter: I) -> Self {
impl<T> FromIterator<T> for PooledTransactions<T> {
fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
Self(iter.into_iter().collect())
}
}
impl<T> Default for PooledTransactions<T> {
fn default() -> Self {
Self(Default::default())
}
}
#[cfg(test)]
mod tests {
use crate::{message::RequestPair, GetPooledTransactions, PooledTransactions};

View File

@@ -2,7 +2,8 @@ use crate::{
errors::{EthHandshakeError, EthStreamError},
message::{EthBroadcastMessage, ProtocolBroadcastMessage},
p2pstream::HANDSHAKE_TIMEOUT,
CanDisconnect, DisconnectReason, EthMessage, EthVersion, ProtocolMessage, Status,
CanDisconnect, DisconnectReason, EthMessage, EthNetworkPrimitives, EthVersion, ProtocolMessage,
Status,
};
use alloy_primitives::bytes::{Bytes, BytesMut};
use futures::{ready, Sink, SinkExt, StreamExt};
@@ -87,7 +88,12 @@ where
// we need to encode and decode here on our own because we don't have an `EthStream` yet
// The max length for a status with TTD is: <msg id = 1 byte> + <rlp(status) = 88 byte>
self.inner
.send(alloy_rlp::encode(ProtocolMessage::from(EthMessage::Status(status))).into())
.send(
alloy_rlp::encode(ProtocolMessage::from(
EthMessage::<EthNetworkPrimitives>::Status(status),
))
.into(),
)
.await?;
let their_msg_res = self.inner.next().await;
@@ -106,14 +112,15 @@ where
}
let version = status.version;
let msg = match ProtocolMessage::decode_message(version, &mut their_msg.as_ref()) {
Ok(m) => m,
Err(err) => {
debug!("decode error in eth handshake: msg={their_msg:x}");
self.inner.disconnect(DisconnectReason::DisconnectRequested).await?;
return Err(EthStreamError::InvalidMessage(err))
}
};
let msg: ProtocolMessage =
match ProtocolMessage::decode_message(version, &mut their_msg.as_ref()) {
Ok(m) => m,
Err(err) => {
debug!("decode error in eth handshake: msg={their_msg:x}");
self.inner.disconnect(DisconnectReason::DisconnectRequested).await?;
return Err(EthStreamError::InvalidMessage(err))
}
};
// The following checks should match the checks in go-ethereum:
// https://github.com/ethereum/go-ethereum/blob/9244d5cd61f3ea5a7645fdf2a1a96d53421e412f/eth/protocols/eth/handshake.go#L87-L89

View File

@@ -11,7 +11,7 @@ fn decode_new_block_network() {
PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("testdata/new_block_network_rlp");
let data = fs::read_to_string(network_data_path).expect("Unable to read file");
let hex_data = hex::decode(data.trim()).unwrap();
let _txs = NewBlock::decode(&mut &hex_data[..]).unwrap();
let _txs: NewBlock = NewBlock::decode(&mut &hex_data[..]).unwrap();
}
#[test]
@@ -20,7 +20,7 @@ fn decode_new_block_network_bsc_one() {
PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("testdata/bsc_new_block_network_one");
let data = fs::read_to_string(network_data_path).expect("Unable to read file");
let hex_data = hex::decode(data.trim()).unwrap();
let _txs = NewBlock::decode(&mut &hex_data[..]).unwrap();
let _txs: NewBlock = NewBlock::decode(&mut &hex_data[..]).unwrap();
}
#[test]
@@ -29,5 +29,5 @@ fn decode_new_block_network_bsc_two() {
PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("testdata/bsc_new_block_network_two");
let data = fs::read_to_string(network_data_path).expect("Unable to read file");
let hex_data = hex::decode(data.trim()).unwrap();
let _txs = NewBlock::decode(&mut &hex_data[..]).unwrap();
let _txs: NewBlock = NewBlock::decode(&mut &hex_data[..]).unwrap();
}

View File

@@ -12,7 +12,7 @@ use test_fuzz::test_fuzz;
#[test_fuzz]
fn roundtrip_pooled_transactions(hex_data: Vec<u8>) -> Result<(), alloy_rlp::Error> {
let input_rlp = &mut &hex_data[..];
let txs = match PooledTransactions::decode(input_rlp) {
let txs: PooledTransactions = match PooledTransactions::decode(input_rlp) {
Ok(txs) => txs,
Err(e) => return Err(e),
};
@@ -28,7 +28,7 @@ fn roundtrip_pooled_transactions(hex_data: Vec<u8>) -> Result<(), alloy_rlp::Err
assert_eq!(expected_encoding, buf);
// now do another decoding, on what we encoded - this should succeed
let txs2 = PooledTransactions::decode(&mut &buf[..]).unwrap();
let txs2: PooledTransactions = PooledTransactions::decode(&mut &buf[..]).unwrap();
// ensure that the payload length is the same
assert_eq!(txs.length(), txs2.length());
@@ -54,7 +54,8 @@ fn decode_request_pair_pooled_blob_transactions() {
.join("testdata/request_pair_pooled_blob_transactions");
let data = fs::read_to_string(network_data_path).expect("Unable to read file");
let hex_data = hex::decode(data.trim()).unwrap();
let _txs = ProtocolMessage::decode_message(EthVersion::Eth68, &mut &hex_data[..]).unwrap();
let _txs: ProtocolMessage =
ProtocolMessage::decode_message(EthVersion::Eth68, &mut &hex_data[..]).unwrap();
}
#[test]

View File

@@ -434,6 +434,10 @@ impl Encodable2718 for PooledTransactionsElement {
}
}
}
fn trie_hash(&self) -> B256 {
*self.hash()
}
}
impl Decodable2718 for PooledTransactionsElement {