chore: eth69 status message support (#16099)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
Shane K Moore
2025-05-22 03:08:26 -07:00
committed by GitHub
parent f18273fb55
commit e4212a4028
16 changed files with 451 additions and 307 deletions

View File

@@ -12,7 +12,7 @@
extern crate alloc;
mod status;
pub use status::{Status, StatusBuilder, StatusEth69, StatusMessage};
pub use status::{Status, StatusBuilder, StatusEth69, StatusMessage, UnifiedStatus};
pub mod version;
pub use version::{EthVersion, ProtocolVersion};

View File

@@ -7,6 +7,205 @@ use core::fmt::{Debug, Display};
use reth_chainspec::{EthChainSpec, Hardforks, MAINNET};
use reth_codecs_derive::add_arbitrary_tests;
/// `UnifiedStatus` is an internal superset of all ETH status fields for all `eth/` versions.
///
/// This type can be converted into [`Status`] or [`StatusEth69`] depending on the version and
/// unsupported fields are stripped out.
#[derive(Clone, Debug, PartialEq, Eq, Copy)]
pub struct UnifiedStatus {
/// The eth protocol version (e.g. eth/66 to eth/69).
pub version: EthVersion,
/// The chain ID identifying the peers network.
pub chain: Chain,
/// The genesis block hash of the peers chain.
pub genesis: B256,
/// The fork ID as defined by EIP-2124.
pub forkid: ForkId,
/// The latest block hash known to the peer.
pub blockhash: B256,
/// The total difficulty of the peers best chain (eth/6668 only).
pub total_difficulty: Option<U256>,
/// The earliest block this node can serve (eth/69 only).
pub earliest_block: Option<u64>,
/// The latest block number this node has (eth/69 only).
pub latest_block: Option<u64>,
}
impl Default for UnifiedStatus {
fn default() -> Self {
let mainnet_genesis = MAINNET.genesis_hash();
Self {
version: EthVersion::Eth68,
chain: Chain::from_named(NamedChain::Mainnet),
genesis: mainnet_genesis,
forkid: MAINNET
.hardfork_fork_id(EthereumHardfork::Frontier)
.expect("Frontier must exist"),
blockhash: mainnet_genesis,
total_difficulty: Some(U256::from(17_179_869_184u64)),
earliest_block: Some(0),
latest_block: Some(0),
}
}
}
impl UnifiedStatus {
/// Helper for creating the `UnifiedStatus` builder
pub fn builder() -> StatusBuilder {
Default::default()
}
/// Build from chainspec + head. Earliest/latest default to full history.
pub fn spec_builder<Spec>(spec: &Spec, head: &Head) -> Self
where
Spec: EthChainSpec + Hardforks,
{
Self::builder()
.chain(spec.chain())
.genesis(spec.genesis_hash())
.forkid(spec.fork_id(head))
.blockhash(head.hash)
.total_difficulty(Some(head.total_difficulty))
.earliest_block(Some(0))
.latest_block(Some(head.number))
.build()
}
/// Override the `(earliest, latest)` history range well advertise to
/// eth/69 peers.
pub const fn set_history_range(&mut self, earliest: u64, latest: u64) {
self.earliest_block = Some(earliest);
self.latest_block = Some(latest);
}
/// Sets the [`EthVersion`] for the status.
pub const fn set_eth_version(&mut self, v: EthVersion) {
self.version = v;
}
/// Consume this `UnifiedStatus` and produce the legacy [`Status`] message used by all
/// `eth/66``eth/68`.
pub fn into_legacy(self) -> Status {
Status {
version: self.version,
chain: self.chain,
genesis: self.genesis,
forkid: self.forkid,
blockhash: self.blockhash,
total_difficulty: self.total_difficulty.unwrap_or(U256::ZERO),
}
}
/// Consume this `UnifiedStatus` and produce the [`StatusEth69`] message used by `eth/69`.
pub fn into_eth69(self) -> StatusEth69 {
StatusEth69 {
version: self.version,
chain: self.chain,
genesis: self.genesis,
forkid: self.forkid,
earliest: self.earliest_block.unwrap_or(0),
latest: self.latest_block.unwrap_or(0),
blockhash: self.blockhash,
}
}
/// Convert this `UnifiedStatus` into the appropriate `StatusMessage` variant based on version.
pub fn into_message(self) -> StatusMessage {
if self.version == EthVersion::Eth69 {
StatusMessage::Eth69(self.into_eth69())
} else {
StatusMessage::Legacy(self.into_legacy())
}
}
/// Build a `UnifiedStatus` from a received `StatusMessage`.
pub const fn from_message(msg: StatusMessage) -> Self {
match msg {
StatusMessage::Legacy(s) => Self {
version: s.version,
chain: s.chain,
genesis: s.genesis,
forkid: s.forkid,
blockhash: s.blockhash,
total_difficulty: Some(s.total_difficulty),
earliest_block: None,
latest_block: None,
},
StatusMessage::Eth69(e) => Self {
version: e.version,
chain: e.chain,
genesis: e.genesis,
forkid: e.forkid,
blockhash: e.blockhash,
total_difficulty: None,
earliest_block: Some(e.earliest),
latest_block: Some(e.latest),
},
}
}
}
/// Builder type for constructing a [`UnifiedStatus`] message.
#[derive(Debug, Default)]
pub struct StatusBuilder {
status: UnifiedStatus,
}
impl StatusBuilder {
/// Consumes the builder and returns the constructed [`UnifiedStatus`].
pub const fn build(self) -> UnifiedStatus {
self.status
}
/// Sets the eth protocol version (e.g., eth/66, eth/69).
pub const fn version(mut self, version: EthVersion) -> Self {
self.status.version = version;
self
}
/// Sets the chain ID
pub const fn chain(mut self, chain: Chain) -> Self {
self.status.chain = chain;
self
}
/// Sets the genesis block hash of the chain.
pub const fn genesis(mut self, genesis: B256) -> Self {
self.status.genesis = genesis;
self
}
/// Sets the fork ID, used for fork compatibility checks.
pub const fn forkid(mut self, forkid: ForkId) -> Self {
self.status.forkid = forkid;
self
}
/// Sets the block hash of the current head.
pub const fn blockhash(mut self, blockhash: B256) -> Self {
self.status.blockhash = blockhash;
self
}
/// Sets the total difficulty, if relevant (Some for eth/6668).
pub const fn total_difficulty(mut self, td: Option<U256>) -> Self {
self.status.total_difficulty = td;
self
}
/// Sets the earliest available block, if known (Some for eth/69).
pub const fn earliest_block(mut self, earliest: Option<u64>) -> Self {
self.status.earliest_block = earliest;
self
}
/// Sets the latest known block, if known (Some for eth/69).
pub const fn latest_block(mut self, latest: Option<u64>) -> Self {
self.status.latest_block = latest;
self
}
}
/// The status message is used in the eth protocol handshake to ensure that peers are on the same
/// network and are following the same fork.
///
@@ -42,41 +241,19 @@ pub struct Status {
pub forkid: ForkId,
}
impl Status {
/// Helper for returning a builder for the status message.
pub fn builder() -> StatusBuilder {
Default::default()
}
/// Sets the [`EthVersion`] for the status.
pub const fn set_eth_version(&mut self, version: EthVersion) {
self.version = version;
}
/// Create a [`StatusBuilder`] from the given [`EthChainSpec`] and head block.
///
/// Sets the `chain` and `genesis`, `blockhash`, and `forkid` fields based on the
/// [`EthChainSpec`] and head.
pub fn spec_builder<Spec>(spec: Spec, head: &Head) -> StatusBuilder
where
Spec: EthChainSpec + Hardforks,
{
Self::builder()
.chain(spec.chain())
.genesis(spec.genesis_hash())
.blockhash(head.hash)
.total_difficulty(head.total_difficulty)
.forkid(spec.fork_id(head))
}
/// Converts this [`Status`] into the [Eth69](https://github.com/ethereum/EIPs/blob/master/EIPS/eip-7642.md) variant that excludes the total difficulty field.
pub const fn into_eth69(self) -> StatusEth69 {
StatusEth69 {
version: EthVersion::Eth69,
chain: self.chain,
blockhash: self.blockhash,
genesis: self.genesis,
forkid: self.forkid,
// <https://etherscan.io/block/0>
impl Default for Status {
fn default() -> Self {
let mainnet_genesis = MAINNET.genesis_hash();
Self {
version: EthVersion::Eth68,
chain: Chain::from_named(NamedChain::Mainnet),
total_difficulty: U256::from(17_179_869_184u64),
blockhash: mainnet_genesis,
genesis: mainnet_genesis,
forkid: MAINNET
.hardfork_fork_id(EthereumHardfork::Frontier)
.expect("The Frontier hardfork should always exist"),
}
}
}
@@ -128,102 +305,6 @@ impl Debug for Status {
}
}
// <https://etherscan.io/block/0>
impl Default for Status {
fn default() -> Self {
let mainnet_genesis = MAINNET.genesis_hash();
Self {
version: EthVersion::Eth68,
chain: Chain::from_named(NamedChain::Mainnet),
total_difficulty: U256::from(17_179_869_184u64),
blockhash: mainnet_genesis,
genesis: mainnet_genesis,
forkid: MAINNET
.hardfork_fork_id(EthereumHardfork::Frontier)
.expect("The Frontier hardfork should always exist"),
}
}
}
/// Builder for [`Status`] messages.
///
/// # Example
/// ```
/// use alloy_consensus::constants::MAINNET_GENESIS_HASH;
/// use alloy_primitives::{B256, U256};
/// use reth_chainspec::{Chain, EthereumHardfork, MAINNET};
/// use reth_eth_wire_types::{EthVersion, Status};
///
/// // this is just an example status message!
/// let status = Status::builder()
/// .version(EthVersion::Eth66)
/// .chain(Chain::mainnet())
/// .total_difficulty(U256::from(100))
/// .blockhash(B256::from(MAINNET_GENESIS_HASH))
/// .genesis(B256::from(MAINNET_GENESIS_HASH))
/// .forkid(MAINNET.hardfork_fork_id(EthereumHardfork::Paris).unwrap())
/// .build();
///
/// assert_eq!(
/// status,
/// Status {
/// version: EthVersion::Eth66,
/// chain: Chain::mainnet(),
/// total_difficulty: U256::from(100),
/// blockhash: B256::from(MAINNET_GENESIS_HASH),
/// genesis: B256::from(MAINNET_GENESIS_HASH),
/// forkid: MAINNET.hardfork_fork_id(EthereumHardfork::Paris).unwrap(),
/// }
/// );
/// ```
#[derive(Debug, Default)]
pub struct StatusBuilder {
status: Status,
}
impl StatusBuilder {
/// Consumes the type and creates the actual [`Status`] message.
pub const fn build(self) -> Status {
self.status
}
/// Sets the protocol version.
pub const fn version(mut self, version: EthVersion) -> Self {
self.status.version = version;
self
}
/// Sets the chain id.
pub const fn chain(mut self, chain: Chain) -> Self {
self.status.chain = chain;
self
}
/// Sets the total difficulty.
pub const fn total_difficulty(mut self, total_difficulty: U256) -> Self {
self.status.total_difficulty = total_difficulty;
self
}
/// Sets the block hash.
pub const fn blockhash(mut self, blockhash: B256) -> Self {
self.status.blockhash = blockhash;
self
}
/// Sets the genesis hash.
pub const fn genesis(mut self, genesis: B256) -> Self {
self.status.genesis = genesis;
self
}
/// Sets the fork id.
pub const fn forkid(mut self, forkid: ForkId) -> Self {
self.status.forkid = forkid;
self
}
}
/// Similar to [`Status`], but for `eth/69` version, which does not contain
/// the `total_difficulty` field.
#[derive(Copy, Clone, PartialEq, Eq, RlpEncodable, RlpDecodable)]
@@ -239,9 +320,6 @@ pub struct StatusEth69 {
/// [EIP155](https://eips.ethereum.org/EIPS/eip-155#list-of-chain-ids).
pub chain: Chain,
/// The highest difficulty block hash the peer has seen
pub blockhash: B256,
/// The genesis hash of the peer's chain.
pub genesis: B256,
@@ -251,6 +329,15 @@ pub struct StatusEth69 {
/// [EIP-2124](https://github.com/ethereum/EIPs/blob/master/EIPS/eip-2124.md).
/// This was added in [`eth/64`](https://eips.ethereum.org/EIPS/eip-2364)
pub forkid: ForkId,
/// Earliest block number this node can serve
pub earliest: u64,
/// Latest block number this node has (current head)
pub latest: u64,
/// Hash of the latest block this node has (current head)
pub blockhash: B256,
}
impl Display for StatusEth69 {
@@ -259,8 +346,14 @@ impl Display for StatusEth69 {
let hexed_genesis = hex::encode(self.genesis);
write!(
f,
"Status {{ version: {}, chain: {}, blockhash: {}, genesis: {}, forkid: {:X?} }}",
self.version, self.chain, hexed_blockhash, hexed_genesis, self.forkid
"StatusEth69 {{ version: {}, chain: {}, genesis: {}, forkid: {:X?}, earliest: {}, latest: {}, blockhash: {} }}",
self.version,
self.chain,
hexed_genesis,
self.forkid,
self.earliest,
self.latest,
hexed_blockhash,
)
}
}
@@ -285,19 +378,6 @@ impl Debug for StatusEth69 {
}
}
// <https://etherscan.io/block/0>
impl Default for StatusEth69 {
fn default() -> Self {
Status::default().into()
}
}
impl From<Status> for StatusEth69 {
fn from(status: Status) -> Self {
status.into_eth69()
}
}
/// `StatusMessage` can store either the Legacy version (with TD) or the
/// eth/69 version (omits TD).
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
@@ -342,21 +422,11 @@ impl StatusMessage {
}
}
/// Converts to legacy Status since full support for EIP-7642
/// is not fully implemented
/// `<https://github.com/ethereum/EIPs/blob/master/EIPS/eip-7642.md>`
pub fn to_legacy(self) -> Status {
/// Returns the latest block hash
pub const fn blockhash(&self) -> B256 {
match self {
Self::Legacy(legacy_status) => legacy_status,
Self::Eth69(status_69) => Status {
version: status_69.version,
chain: status_69.chain,
// total_difficulty is omitted in Eth69.
total_difficulty: U256::default(),
blockhash: status_69.blockhash,
genesis: status_69.genesis,
forkid: status_69.forkid,
},
Self::Legacy(legacy_status) => legacy_status.blockhash,
Self::Eth69(status_69) => status_69.blockhash,
}
}
}
@@ -377,9 +447,17 @@ impl Encodable for StatusMessage {
}
}
impl Display for StatusMessage {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
Self::Legacy(s) => Display::fmt(s, f),
Self::Eth69(s69) => Display::fmt(s69, f),
}
}
}
#[cfg(test)]
mod tests {
use crate::{EthVersion, Status, StatusEth69};
use crate::{EthVersion, Status, StatusEth69, StatusMessage, UnifiedStatus};
use alloy_consensus::constants::MAINNET_GENESIS_HASH;
use alloy_genesis::Genesis;
use alloy_hardforks::{EthereumHardfork, ForkHash, ForkId, Head};
@@ -432,62 +510,83 @@ mod tests {
}
#[test]
fn test_status_to_statuseth69_conversion() {
let status = StatusEth69 {
version: EthVersion::Eth69,
chain: Chain::from_named(NamedChain::Mainnet),
blockhash: B256::from_str(
"feb27336ca7923f8fab3bd617fcb6e75841538f71c1bcfc267d7838489d9e13d",
fn roundtrip_eth69() {
let unified_status = UnifiedStatus::builder()
.version(EthVersion::Eth69)
.chain(Chain::mainnet())
.genesis(MAINNET_GENESIS_HASH)
.forkid(ForkId { hash: ForkHash([0xb7, 0x15, 0x07, 0x7d]), next: 0 })
.blockhash(
B256::from_str("feb27336ca7923f8fab3bd617fcb6e75841538f71c1bcfc267d7838489d9e13d")
.unwrap(),
)
.unwrap(),
genesis: MAINNET_GENESIS_HASH,
forkid: ForkId { hash: ForkHash([0xb7, 0x15, 0x07, 0x7d]), next: 0 },
};
let status_converted: StatusEth69 = Status {
version: EthVersion::Eth69,
chain: Chain::from_named(NamedChain::Mainnet),
total_difficulty: U256::from(36206751599115524359527u128),
blockhash: B256::from_str(
"feb27336ca7923f8fab3bd617fcb6e75841538f71c1bcfc267d7838489d9e13d",
.earliest_block(Some(1))
.latest_block(Some(2))
.total_difficulty(None)
.build();
let status_message = unified_status.into_message();
let roundtripped_unified_status = UnifiedStatus::from_message(status_message);
assert_eq!(unified_status, roundtripped_unified_status);
}
#[test]
fn roundtrip_legacy() {
let unified_status = UnifiedStatus::builder()
.version(EthVersion::Eth68)
.chain(Chain::sepolia())
.genesis(MAINNET_GENESIS_HASH)
.forkid(ForkId { hash: ForkHash([0xaa, 0xbb, 0xcc, 0xdd]), next: 0 })
.blockhash(
B256::from_str("feb27336ca7923f8fab3bd617fcb6e75841538f71c1bcfc267d7838489d9e13d")
.unwrap(),
)
.unwrap(),
genesis: MAINNET_GENESIS_HASH,
forkid: ForkId { hash: ForkHash([0xb7, 0x15, 0x07, 0x7d]), next: 0 },
}
.into();
assert_eq!(status, status_converted);
.total_difficulty(Some(U256::from(42u64)))
.earliest_block(None)
.latest_block(None)
.build();
let status_message = unified_status.into_message();
let roundtripped_unified_status = UnifiedStatus::from_message(status_message);
assert_eq!(unified_status, roundtripped_unified_status);
}
#[test]
fn encode_eth69_status_message() {
let expected = hex!(
"f84b4501a0feb27336ca7923f8fab3bd617fcb6e75841538f71c1bcfc267d7838489d9e13da0d4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3c684b715077d80"
);
let expected = hex!("f8544501a0d4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3c684b715077d8083ed14f2840112a880a0feb27336ca7923f8fab3bd617fcb6e75841538f71c1bcfc267d7838489d9e13d");
let status = StatusEth69 {
version: EthVersion::Eth69,
chain: Chain::from_named(NamedChain::Mainnet),
genesis: MAINNET_GENESIS_HASH,
forkid: ForkId { hash: ForkHash([0xb7, 0x15, 0x07, 0x7d]), next: 0 },
earliest: 15_537_394,
latest: 18_000_000,
blockhash: B256::from_str(
"feb27336ca7923f8fab3bd617fcb6e75841538f71c1bcfc267d7838489d9e13d",
)
.unwrap(),
genesis: MAINNET_GENESIS_HASH,
forkid: ForkId { hash: ForkHash([0xb7, 0x15, 0x07, 0x7d]), next: 0 },
};
let mut rlp_status = vec![];
status.encode(&mut rlp_status);
assert_eq!(rlp_status, expected);
let status: StatusEth69 = Status::builder()
let status = UnifiedStatus::builder()
.version(EthVersion::Eth69)
.chain(Chain::from_named(NamedChain::Mainnet))
.genesis(MAINNET_GENESIS_HASH)
.forkid(ForkId { hash: ForkHash([0xb7, 0x15, 0x07, 0x7d]), next: 0 })
.blockhash(
B256::from_str("feb27336ca7923f8fab3bd617fcb6e75841538f71c1bcfc267d7838489d9e13d")
.unwrap(),
)
.genesis(MAINNET_GENESIS_HASH)
.forkid(ForkId { hash: ForkHash([0xb7, 0x15, 0x07, 0x7d]), next: 0 })
.earliest_block(Some(15_537_394))
.latest_block(Some(18_000_000))
.build()
.into();
.into_message();
let mut rlp_status = vec![];
status.encode(&mut rlp_status);
assert_eq!(rlp_status, expected);
@@ -495,21 +594,43 @@ mod tests {
#[test]
fn decode_eth69_status_message() {
let data = hex!(
"0xf84b4501a0feb27336ca7923f8fab3bd617fcb6e75841538f71c1bcfc267d7838489d9e13da0d4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3c684b715077d80"
);
let data = hex!("f8544501a0d4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3c684b715077d8083ed14f2840112a880a0feb27336ca7923f8fab3bd617fcb6e75841538f71c1bcfc267d7838489d9e13d");
let expected = StatusEth69 {
version: EthVersion::Eth69,
chain: Chain::from_named(NamedChain::Mainnet),
genesis: MAINNET_GENESIS_HASH,
forkid: ForkId { hash: ForkHash([0xb7, 0x15, 0x07, 0x7d]), next: 0 },
earliest: 15_537_394,
latest: 18_000_000,
blockhash: B256::from_str(
"feb27336ca7923f8fab3bd617fcb6e75841538f71c1bcfc267d7838489d9e13d",
)
.unwrap(),
genesis: MAINNET_GENESIS_HASH,
forkid: ForkId { hash: ForkHash([0xb7, 0x15, 0x07, 0x7d]), next: 0 },
};
let status = StatusEth69::decode(&mut &data[..]).unwrap();
assert_eq!(status, expected);
let expected_message = UnifiedStatus::builder()
.version(EthVersion::Eth69)
.chain(Chain::from_named(NamedChain::Mainnet))
.genesis(MAINNET_GENESIS_HASH)
.forkid(ForkId { hash: ForkHash([0xb7, 0x15, 0x07, 0x7d]), next: 0 })
.earliest_block(Some(15_537_394))
.latest_block(Some(18_000_000))
.blockhash(
B256::from_str("feb27336ca7923f8fab3bd617fcb6e75841538f71c1bcfc267d7838489d9e13d")
.unwrap(),
)
.build()
.into_message();
let expected_status = if let StatusMessage::Eth69(status69) = expected_message {
status69
} else {
panic!("expected StatusMessage::Eth69 variant");
};
assert_eq!(status, expected_status);
}
#[test]
@@ -634,11 +755,11 @@ mod tests {
let forkid = ForkId { hash: forkhash, next: 0 };
let status = Status::spec_builder(&spec, &head).build();
let status = UnifiedStatus::spec_builder(&spec, &head);
assert_eq!(status.chain, Chain::from_id(1337));
assert_eq!(status.forkid, forkid);
assert_eq!(status.total_difficulty, total_difficulty);
assert_eq!(status.total_difficulty.unwrap(), total_difficulty);
assert_eq!(status.blockhash, head_hash);
assert_eq!(status.genesis, genesis_hash);
}

View File

@@ -10,7 +10,7 @@ use crate::{
message::{EthBroadcastMessage, ProtocolBroadcastMessage},
p2pstream::HANDSHAKE_TIMEOUT,
CanDisconnect, DisconnectReason, EthMessage, EthNetworkPrimitives, EthVersion, ProtocolMessage,
Status,
UnifiedStatus,
};
use alloy_primitives::bytes::{Bytes, BytesMut};
use alloy_rlp::Encodable;
@@ -66,19 +66,19 @@ where
/// remote peer.
pub async fn handshake<N: NetworkPrimitives>(
self,
status: Status,
status: UnifiedStatus,
fork_filter: ForkFilter,
) -> Result<(EthStream<S, N>, Status), EthStreamError> {
) -> Result<(EthStream<S, N>, UnifiedStatus), EthStreamError> {
self.handshake_with_timeout(status, fork_filter, HANDSHAKE_TIMEOUT).await
}
/// Wrapper around handshake which enforces a timeout.
pub async fn handshake_with_timeout<N: NetworkPrimitives>(
self,
status: Status,
status: UnifiedStatus,
fork_filter: ForkFilter,
timeout_limit: Duration,
) -> Result<(EthStream<S, N>, Status), EthStreamError> {
) -> Result<(EthStream<S, N>, UnifiedStatus), EthStreamError> {
timeout(timeout_limit, Self::handshake_without_timeout(self, status, fork_filter))
.await
.map_err(|_| EthStreamError::StreamTimeout)?
@@ -87,20 +87,21 @@ where
/// Handshake with no timeout
pub async fn handshake_without_timeout<N: NetworkPrimitives>(
mut self,
status: Status,
status: UnifiedStatus,
fork_filter: ForkFilter,
) -> Result<(EthStream<S, N>, Status), EthStreamError> {
) -> Result<(EthStream<S, N>, UnifiedStatus), EthStreamError> {
trace!(
%status,
status = %status.into_message(),
"sending eth status to peer"
);
EthereumEthHandshake(&mut self.inner).eth_handshake(status, fork_filter).await?;
let their_status =
EthereumEthHandshake(&mut self.inner).eth_handshake(status, fork_filter).await?;
// now we can create the `EthStream` because the peer has successfully completed
// the handshake
let stream = EthStream::new(status.version, self.inner);
Ok((stream, status))
Ok((stream, their_status))
}
}
@@ -328,14 +329,14 @@ mod tests {
hello::DEFAULT_TCP_PORT,
p2pstream::UnauthedP2PStream,
EthMessage, EthStream, EthVersion, HelloMessageWithProtocols, PassthroughCodec,
ProtocolVersion, Status,
ProtocolVersion, Status, StatusMessage,
};
use alloy_chains::NamedChain;
use alloy_primitives::{bytes::Bytes, B256, U256};
use alloy_rlp::Decodable;
use futures::{SinkExt, StreamExt};
use reth_ecies::stream::ECIESStream;
use reth_eth_wire_types::EthNetworkPrimitives;
use reth_eth_wire_types::{EthNetworkPrimitives, UnifiedStatus};
use reth_ethereum_forks::{ForkFilter, Head};
use reth_network_peers::pk2id;
use secp256k1::{SecretKey, SECP256K1};
@@ -357,11 +358,12 @@ mod tests {
// Pass the current fork id.
forkid: fork_filter.current(),
};
let unified_status = UnifiedStatus::from_message(StatusMessage::Legacy(status));
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let local_addr = listener.local_addr().unwrap();
let status_clone = status;
let status_clone = unified_status;
let fork_filter_clone = fork_filter.clone();
let handle = tokio::spawn(async move {
// roughly based off of the design of tokio::net::TcpListener
@@ -381,12 +383,12 @@ mod tests {
// try to connect
let (_, their_status) = UnauthedEthStream::new(sink)
.handshake::<EthNetworkPrimitives>(status, fork_filter)
.handshake::<EthNetworkPrimitives>(unified_status, fork_filter)
.await
.unwrap();
// their status is a clone of our status, these should be equal
assert_eq!(their_status, status);
assert_eq!(their_status, unified_status);
// wait for it to finish
handle.await.unwrap();
@@ -406,11 +408,12 @@ mod tests {
// Pass the current fork id.
forkid: fork_filter.current(),
};
let unified_status = UnifiedStatus::from_message(StatusMessage::Legacy(status));
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let local_addr = listener.local_addr().unwrap();
let status_clone = status;
let status_clone = unified_status;
let fork_filter_clone = fork_filter.clone();
let handle = tokio::spawn(async move {
// roughly based off of the design of tokio::net::TcpListener
@@ -430,12 +433,12 @@ mod tests {
// try to connect
let (_, their_status) = UnauthedEthStream::new(sink)
.handshake::<EthNetworkPrimitives>(status, fork_filter)
.handshake::<EthNetworkPrimitives>(unified_status, fork_filter)
.await
.unwrap();
// their status is a clone of our status, these should be equal
assert_eq!(their_status, status);
assert_eq!(their_status, unified_status);
// await the other handshake
handle.await.unwrap();
@@ -455,11 +458,12 @@ mod tests {
// Pass the current fork id.
forkid: fork_filter.current(),
};
let unified_status = UnifiedStatus::from_message(StatusMessage::Legacy(status));
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let local_addr = listener.local_addr().unwrap();
let status_clone = status;
let status_clone = unified_status;
let fork_filter_clone = fork_filter.clone();
let handle = tokio::spawn(async move {
// roughly based off of the design of tokio::net::TcpListener
@@ -483,7 +487,7 @@ mod tests {
// try to connect
let handshake_res = UnauthedEthStream::new(sink)
.handshake::<EthNetworkPrimitives>(status, fork_filter)
.handshake::<EthNetworkPrimitives>(unified_status, fork_filter)
.await;
// this handshake should also fail due to td too high
@@ -599,8 +603,9 @@ mod tests {
// Pass the current fork id.
forkid: fork_filter.current(),
};
let unified_status = UnifiedStatus::from_message(StatusMessage::Legacy(status));
let status_copy = status;
let status_copy = unified_status;
let fork_filter_clone = fork_filter.clone();
let test_msg_clone = test_msg.clone();
let handle = tokio::spawn(async move {
@@ -647,8 +652,10 @@ mod tests {
let unauthed_stream = UnauthedP2PStream::new(sink);
let (p2p_stream, _) = unauthed_stream.handshake(client_hello).await.unwrap();
let (mut client_stream, _) =
UnauthedEthStream::new(p2p_stream).handshake(status, fork_filter).await.unwrap();
let (mut client_stream, _) = UnauthedEthStream::new(p2p_stream)
.handshake(unified_status, fork_filter)
.await
.unwrap();
client_stream.send(test_msg).await.unwrap();
@@ -670,11 +677,12 @@ mod tests {
// Pass the current fork id.
forkid: fork_filter.current(),
};
let unified_status = UnifiedStatus::from_message(StatusMessage::Legacy(status));
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let local_addr = listener.local_addr().unwrap();
let status_clone = status;
let status_clone = unified_status;
let fork_filter_clone = fork_filter.clone();
let _handle = tokio::spawn(async move {
// Delay accepting the connection for longer than the client's timeout period
@@ -697,7 +705,7 @@ mod tests {
// try to connect
let handshake_result = UnauthedEthStream::new(sink)
.handshake_with_timeout::<EthNetworkPrimitives>(
status,
unified_status,
fork_filter,
Duration::from_secs(1),
)

View File

@@ -6,7 +6,8 @@ use crate::{
use bytes::{Bytes, BytesMut};
use futures::{Sink, SinkExt, Stream};
use reth_eth_wire_types::{
DisconnectReason, EthMessage, EthNetworkPrimitives, ProtocolMessage, Status, StatusMessage,
DisconnectReason, EthMessage, EthNetworkPrimitives, ProtocolMessage, StatusMessage,
UnifiedStatus,
};
use reth_ethereum_forks::ForkFilter;
use reth_primitives_traits::GotExpected;
@@ -21,10 +22,10 @@ pub trait EthRlpxHandshake: Debug + Send + Sync + 'static {
fn handshake<'a>(
&'a self,
unauth: &'a mut dyn UnauthEth,
status: Status,
status: UnifiedStatus,
fork_filter: ForkFilter,
timeout_limit: Duration,
) -> Pin<Box<dyn Future<Output = Result<Status, EthStreamError>> + 'a + Send>>;
) -> Pin<Box<dyn Future<Output = Result<UnifiedStatus, EthStreamError>> + 'a + Send>>;
}
/// An unauthenticated stream that can send and receive messages.
@@ -57,10 +58,10 @@ impl EthRlpxHandshake for EthHandshake {
fn handshake<'a>(
&'a self,
unauth: &'a mut dyn UnauthEth,
status: Status,
status: UnifiedStatus,
fork_filter: ForkFilter,
timeout_limit: Duration,
) -> Pin<Box<dyn Future<Output = Result<Status, EthStreamError>> + 'a + Send>> {
) -> Pin<Box<dyn Future<Output = Result<UnifiedStatus, EthStreamError>> + 'a + Send>> {
Box::pin(async move {
timeout(timeout_limit, EthereumEthHandshake(unauth).eth_handshake(status, fork_filter))
.await
@@ -81,18 +82,18 @@ where
/// Performs the `eth` rlpx protocol handshake using the given input stream.
pub async fn eth_handshake(
self,
status: Status,
unified_status: UnifiedStatus,
fork_filter: ForkFilter,
) -> Result<Status, EthStreamError> {
) -> Result<UnifiedStatus, EthStreamError> {
let unauth = self.0;
let status = unified_status.into_message();
// Send our status message
let status_msg =
alloy_rlp::encode(ProtocolMessage::<EthNetworkPrimitives>::from(EthMessage::<
EthNetworkPrimitives,
>::Status(
StatusMessage::Legacy(status),
)))
.into();
let status_msg = alloy_rlp::encode(ProtocolMessage::<EthNetworkPrimitives>::from(
EthMessage::Status(status),
))
.into();
unauth.send(status_msg).await.map_err(EthStreamError::from)?;
// Receive peer's response
@@ -117,7 +118,7 @@ where
return Err(EthStreamError::MessageTooBig(their_msg.len()));
}
let version = status.version;
let version = status.version();
let msg = match ProtocolMessage::<EthNetworkPrimitives>::decode_message(
version,
&mut their_msg.as_ref(),
@@ -138,14 +139,14 @@ where
EthMessage::Status(their_status_message) => {
trace!("Validating incoming ETH status from peer");
if status.genesis != their_status_message.genesis() {
if status.genesis() != their_status_message.genesis() {
unauth
.disconnect(DisconnectReason::ProtocolBreach)
.await
.map_err(EthStreamError::from)?;
return Err(EthHandshakeError::MismatchedGenesis(
GotExpected {
expected: status.genesis,
expected: status.genesis(),
got: their_status_message.genesis(),
}
.into(),
@@ -153,41 +154,43 @@ where
.into());
}
if status.version != their_status_message.version() {
if status.version() != their_status_message.version() {
unauth
.disconnect(DisconnectReason::ProtocolBreach)
.await
.map_err(EthStreamError::from)?;
return Err(EthHandshakeError::MismatchedProtocolVersion(GotExpected {
got: their_status_message.version(),
expected: status.version,
expected: status.version(),
})
.into());
}
if status.chain != *their_status_message.chain() {
if *status.chain() != *their_status_message.chain() {
unauth
.disconnect(DisconnectReason::ProtocolBreach)
.await
.map_err(EthStreamError::from)?;
return Err(EthHandshakeError::MismatchedChain(GotExpected {
got: *their_status_message.chain(),
expected: status.chain,
expected: *status.chain(),
})
.into());
}
// Ensure total difficulty is reasonable
if status.total_difficulty.bit_len() > 160 {
unauth
.disconnect(DisconnectReason::ProtocolBreach)
.await
.map_err(EthStreamError::from)?;
return Err(EthHandshakeError::TotalDifficultyBitLenTooLarge {
got: status.total_difficulty.bit_len(),
maximum: 160,
if let StatusMessage::Legacy(s) = status {
if s.total_difficulty.bit_len() > 160 {
unauth
.disconnect(DisconnectReason::ProtocolBreach)
.await
.map_err(EthStreamError::from)?;
return Err(EthHandshakeError::TotalDifficultyBitLenTooLarge {
got: s.total_difficulty.bit_len(),
maximum: 160,
}
.into());
}
.into());
}
// Fork validation
@@ -202,7 +205,7 @@ where
return Err(err.into());
}
Ok(their_status_message.to_legacy())
Ok(UnifiedStatus::from_message(their_status_message))
}
_ => {
unauth

View File

@@ -20,7 +20,8 @@ use crate::{
capability::{SharedCapabilities, SharedCapability, UnsupportedCapabilityError},
errors::{EthStreamError, P2PStreamError},
p2pstream::DisconnectP2P,
CanDisconnect, Capability, DisconnectReason, EthStream, P2PStream, Status, UnauthedEthStream,
CanDisconnect, Capability, DisconnectReason, EthStream, P2PStream, UnauthedEthStream,
UnifiedStatus,
};
use bytes::{Bytes, BytesMut};
use futures::{Sink, SinkExt, Stream, StreamExt, TryStream, TryStreamExt};
@@ -207,9 +208,9 @@ impl<St> RlpxProtocolMultiplexer<St> {
/// primary protocol.
pub async fn into_eth_satellite_stream<N: NetworkPrimitives>(
self,
status: Status,
status: UnifiedStatus,
fork_filter: ForkFilter,
) -> Result<(RlpxSatelliteStream<St, EthStream<ProtocolProxy, N>>, Status), EthStreamError>
) -> Result<(RlpxSatelliteStream<St, EthStream<ProtocolProxy, N>>, UnifiedStatus), EthStreamError>
where
St: Stream<Item = io::Result<BytesMut>> + Sink<Bytes, Error = io::Error> + Unpin,
{

View File

@@ -4,7 +4,7 @@
use crate::{
hello::DEFAULT_TCP_PORT, EthVersion, HelloMessageWithProtocols, P2PStream, ProtocolVersion,
Status, UnauthedP2PStream,
Status, StatusMessage, UnauthedP2PStream, UnifiedStatus,
};
use alloy_chains::Chain;
use alloy_primitives::{B256, U256};
@@ -32,7 +32,7 @@ pub fn eth_hello() -> (HelloMessageWithProtocols, SecretKey) {
}
/// Returns testing eth handshake status and fork filter.
pub fn eth_handshake() -> (Status, ForkFilter) {
pub fn eth_handshake() -> (UnifiedStatus, ForkFilter) {
let genesis = B256::random();
let fork_filter = ForkFilter::new(Head::default(), genesis, 0, Vec::new());
@@ -45,7 +45,9 @@ pub fn eth_handshake() -> (Status, ForkFilter) {
// Pass the current fork id.
forkid: fork_filter.current(),
};
(status, fork_filter)
let unified_status = UnifiedStatus::from_message(StatusMessage::Legacy(status));
(unified_status, fork_filter)
}
/// Connects to a remote node and returns an authenticated `P2PStream` with the remote node.

View File

@@ -4,7 +4,7 @@ use reth_eth_wire_types::{
message::RequestPair, BlockBodies, BlockHeaders, Capabilities, DisconnectReason, EthMessage,
EthNetworkPrimitives, EthVersion, GetBlockBodies, GetBlockHeaders, GetNodeData,
GetPooledTransactions, GetReceipts, NetworkPrimitives, NodeData, PooledTransactions, Receipts,
Status,
UnifiedStatus,
};
use reth_ethereum_forks::ForkId;
use reth_network_p2p::error::{RequestError, RequestResult};
@@ -63,7 +63,7 @@ pub struct SessionInfo {
/// Capabilities the peer announced.
pub capabilities: Arc<Capabilities>,
/// The status of the peer to which a session was established.
pub status: Arc<Status>,
pub status: Arc<UnifiedStatus>,
/// Negotiated eth version of the session.
pub version: EthVersion,
/// The kind of peer this session represents

View File

@@ -35,7 +35,7 @@ pub use events::{
};
use reth_eth_wire_types::{
capability::Capabilities, DisconnectReason, EthVersion, NetworkPrimitives, Status,
capability::Capabilities, DisconnectReason, EthVersion, NetworkPrimitives, UnifiedStatus,
};
use reth_network_p2p::sync::NetworkSyncUpdater;
use reth_network_peers::NodeRecord;
@@ -238,7 +238,7 @@ pub struct PeerInfo {
/// The negotiated eth version.
pub eth_version: EthVersion,
/// The Status message the peer sent for the `eth` handshake
pub status: Arc<Status>,
pub status: Arc<UnifiedStatus>,
/// The timestamp when the session to that peer has been established.
pub session_established: Instant,
/// The peer's connection kind

View File

@@ -12,7 +12,8 @@ use reth_discv5::NetworkStackId;
use reth_dns_discovery::DnsDiscoveryConfig;
use reth_eth_wire::{
handshake::{EthHandshake, EthRlpxHandshake},
EthNetworkPrimitives, HelloMessage, HelloMessageWithProtocols, NetworkPrimitives, Status,
EthNetworkPrimitives, HelloMessage, HelloMessageWithProtocols, NetworkPrimitives,
UnifiedStatus,
};
use reth_ethereum_forks::{ForkFilter, Head};
use reth_network_peers::{mainnet_nodes, pk2id, sepolia_nodes, PeerId, TrustedPeer};
@@ -37,7 +38,7 @@ pub struct NetworkConfig<C, N: NetworkPrimitives = EthNetworkPrimitives> {
/// The client type that can interact with the chain.
///
/// This type is used to fetch the block number after we established a session and received the
/// [Status] block hash.
/// [`UnifiedStatus`] block hash.
pub client: C,
/// The node's secret key, from which the node's identity is derived.
pub secret_key: SecretKey,
@@ -73,7 +74,7 @@ pub struct NetworkConfig<C, N: NetworkPrimitives = EthNetworkPrimitives> {
/// The executor to use for spawning tasks.
pub executor: Box<dyn TaskSpawner>,
/// The `Status` message to send to peers at the beginning.
pub status: Status,
pub status: UnifiedStatus,
/// Sets the hello message for the p2p handshake in `RLPx`
pub hello_message: HelloMessageWithProtocols,
/// Additional protocols to announce and handle in `RLPx`
@@ -296,7 +297,7 @@ impl<N: NetworkPrimitives> NetworkConfigBuilder<N> {
/// Sets the highest synced block.
///
/// This is used to construct the appropriate [`ForkFilter`] and [`Status`] message.
/// This is used to construct the appropriate [`ForkFilter`] and [`UnifiedStatus`] message.
///
/// If not set, this defaults to the genesis specified by the current chain specification.
pub const fn set_head(mut self, head: Head) -> Self {
@@ -622,7 +623,7 @@ impl<N: NetworkPrimitives> NetworkConfigBuilder<N> {
hello_message.port = listener_addr.port();
// set the status
let status = Status::spec_builder(&chain_spec, &head).build();
let status = UnifiedStatus::spec_builder(&chain_spec, &head);
// set a fork filter based on the chain spec and head
let fork_filter = chain_spec.fork_filter(head);

View File

@@ -856,8 +856,8 @@ mod tests {
use reth_ecies::stream::ECIESStream;
use reth_eth_wire::{
handshake::EthHandshake, EthNetworkPrimitives, EthStream, GetBlockBodies,
HelloMessageWithProtocols, P2PStream, Status, StatusBuilder, UnauthedEthStream,
UnauthedP2PStream,
HelloMessageWithProtocols, P2PStream, StatusBuilder, UnauthedEthStream, UnauthedP2PStream,
UnifiedStatus,
};
use reth_ethereum_forks::EthereumHardfork;
use reth_network_peers::pk2id;
@@ -881,7 +881,7 @@ mod tests {
secret_key: SecretKey,
local_peer_id: PeerId,
hello: HelloMessageWithProtocols,
status: Status,
status: UnifiedStatus,
fork_filter: ForkFilter,
next_id: usize,
}

View File

@@ -7,7 +7,8 @@ use crate::{
};
use reth_ecies::ECIESError;
use reth_eth_wire::{
errors::EthStreamError, Capabilities, DisconnectReason, EthVersion, NetworkPrimitives, Status,
errors::EthStreamError, Capabilities, DisconnectReason, EthVersion, NetworkPrimitives,
UnifiedStatus,
};
use reth_network_api::PeerInfo;
use reth_network_peers::{NodeRecord, PeerId};
@@ -73,7 +74,7 @@ pub struct ActiveSessionHandle<N: NetworkPrimitives> {
/// The local address of the connection.
pub(crate) local_addr: Option<SocketAddr>,
/// The Status message the peer sent for the `eth` handshake
pub(crate) status: Arc<Status>,
pub(crate) status: Arc<UnifiedStatus>,
}
// === impl ActiveSessionHandle ===
@@ -173,7 +174,7 @@ pub enum PendingSessionEvent<N: NetworkPrimitives> {
/// All capabilities the peer announced
capabilities: Arc<Capabilities>,
/// The Status message the peer sent for the `eth` handshake
status: Arc<Status>,
status: Arc<UnifiedStatus>,
/// The actual connection stream which can be used to send and receive `eth` protocol
/// messages
conn: EthRlpxConnection<N>,

View File

@@ -35,7 +35,7 @@ use reth_ecies::{stream::ECIESStream, ECIESError};
use reth_eth_wire::{
errors::EthStreamError, handshake::EthRlpxHandshake, multiplex::RlpxProtocolMultiplexer,
Capabilities, DisconnectReason, EthStream, EthVersion, HelloMessageWithProtocols,
NetworkPrimitives, Status, UnauthedP2PStream, HANDSHAKE_TIMEOUT,
NetworkPrimitives, UnauthedP2PStream, UnifiedStatus, HANDSHAKE_TIMEOUT,
};
use reth_ethereum_forks::{ForkFilter, ForkId, ForkTransition, Head};
use reth_metrics::common::mpsc::MeteredPollSender;
@@ -77,7 +77,7 @@ pub struct SessionManager<N: NetworkPrimitives> {
/// The secret key used for authenticating sessions.
secret_key: SecretKey,
/// The `Status` message to send to peers.
status: Status,
status: UnifiedStatus,
/// The `HelloMessage` message to send to peers.
hello_message: HelloMessageWithProtocols,
/// The [`ForkFilter`] used to validate the peer's `Status` message.
@@ -126,7 +126,7 @@ impl<N: NetworkPrimitives> SessionManager<N> {
secret_key: SecretKey,
config: SessionsConfig,
executor: Box<dyn TaskSpawner>,
status: Status,
status: UnifiedStatus,
hello_message: HelloMessageWithProtocols,
fork_filter: ForkFilter,
extra_protocols: RlpxSubProtocols,
@@ -175,7 +175,7 @@ impl<N: NetworkPrimitives> SessionManager<N> {
}
/// Returns the current status of the session.
pub const fn status(&self) -> Status {
pub const fn status(&self) -> UnifiedStatus {
self.status
}
@@ -220,9 +220,11 @@ impl<N: NetworkPrimitives> SessionManager<N> {
/// active [`ForkId`]. See also [`ForkFilter::set_head`].
pub(crate) fn on_status_update(&mut self, head: Head) -> Option<ForkTransition> {
self.status.blockhash = head.hash;
self.status.total_difficulty = head.total_difficulty;
self.status.total_difficulty = Some(head.total_difficulty);
let transition = self.fork_filter.set_head(head);
self.status.forkid = self.fork_filter.current();
self.status.latest_block = Some(head.number);
transition
}
@@ -681,7 +683,7 @@ pub enum SessionEvent<N: NetworkPrimitives> {
/// negotiated eth version
version: EthVersion,
/// The Status message the peer sent during the `eth` handshake
status: Arc<Status>,
status: Arc<UnifiedStatus>,
/// The channel for sending messages to the peer with the session
messages: PeerRequestSender<PeerRequest<N>>,
/// The direction of the session, either `Inbound` or `Outgoing`
@@ -828,7 +830,7 @@ pub(crate) async fn start_pending_incoming_session<N: NetworkPrimitives>(
remote_addr: SocketAddr,
secret_key: SecretKey,
hello: HelloMessageWithProtocols,
status: Status,
status: UnifiedStatus,
fork_filter: ForkFilter,
extra_handlers: RlpxSubProtocolHandlers,
) {
@@ -861,7 +863,7 @@ async fn start_pending_outbound_session<N: NetworkPrimitives>(
remote_peer_id: PeerId,
secret_key: SecretKey,
hello: HelloMessageWithProtocols,
status: Status,
status: UnifiedStatus,
fork_filter: ForkFilter,
extra_handlers: RlpxSubProtocolHandlers,
) {
@@ -913,7 +915,7 @@ async fn authenticate<N: NetworkPrimitives>(
secret_key: SecretKey,
direction: Direction,
hello: HelloMessageWithProtocols,
status: Status,
status: UnifiedStatus,
fork_filter: ForkFilter,
extra_handlers: RlpxSubProtocolHandlers,
) {
@@ -996,7 +998,7 @@ async fn authenticate_stream<N: NetworkPrimitives>(
local_addr: Option<SocketAddr>,
direction: Direction,
mut hello: HelloMessageWithProtocols,
mut status: Status,
mut status: UnifiedStatus,
fork_filter: ForkFilter,
mut extra_handlers: RlpxSubProtocolHandlers,
) -> PendingSessionEvent<N> {
@@ -1068,7 +1070,7 @@ async fn authenticate_stream<N: NetworkPrimitives>(
.await
{
Ok(their_status) => {
let eth_stream = EthStream::new(status.version, p2p_stream);
let eth_stream = EthStream::new(eth_version, p2p_stream);
(eth_stream.into(), their_status)
}
Err(err) => {

View File

@@ -13,7 +13,7 @@ use alloy_primitives::B256;
use rand::seq::SliceRandom;
use reth_eth_wire::{
BlockHashNumber, Capabilities, DisconnectReason, EthNetworkPrimitives, NetworkPrimitives,
NewBlockHashes, Status,
NewBlockHashes, UnifiedStatus,
};
use reth_ethereum_forks::ForkId;
use reth_network_api::{DiscoveredEvent, DiscoveryEvent, PeerRequest, PeerRequestSender};
@@ -82,7 +82,7 @@ pub struct NetworkState<N: NetworkPrimitives = EthNetworkPrimitives> {
/// The client type that can interact with the chain.
///
/// This type is used to fetch the block number after we established a session and received the
/// [Status] block hash.
/// [`UnifiedStatus`] block hash.
client: BlockNumReader,
/// Network discovery.
discovery: Discovery,
@@ -146,7 +146,7 @@ impl<N: NetworkPrimitives> NetworkState<N> {
&mut self,
peer: PeerId,
capabilities: Arc<Capabilities>,
status: Arc<Status>,
status: Arc<UnifiedStatus>,
request_tx: PeerRequestSender<PeerRequest<N>>,
timeout: Arc<AtomicU64>,
) {

View File

@@ -9,7 +9,7 @@ use crate::{
use futures::Stream;
use reth_eth_wire::{
errors::EthStreamError, Capabilities, DisconnectReason, EthNetworkPrimitives, EthVersion,
NetworkPrimitives, Status,
NetworkPrimitives, UnifiedStatus,
};
use reth_network_api::{PeerRequest, PeerRequestSender};
use reth_network_peers::PeerId;
@@ -382,7 +382,7 @@ pub(crate) enum SwarmEvent<N: NetworkPrimitives = EthNetworkPrimitives> {
/// negotiated eth version
version: EthVersion,
messages: PeerRequestSender<PeerRequest<N>>,
status: Arc<Status>,
status: Arc<UnifiedStatus>,
direction: Direction,
},
SessionClosed {

View File

@@ -4,8 +4,9 @@ use futures::SinkExt;
use reth_eth_wire::{
errors::{EthHandshakeError, EthStreamError},
handshake::{EthRlpxHandshake, EthereumEthHandshake, UnauthEth},
UnifiedStatus,
};
use reth_eth_wire_types::{DisconnectReason, EthVersion, Status};
use reth_eth_wire_types::{DisconnectReason, EthVersion};
use reth_ethereum_forks::ForkFilter;
use std::{future::Future, pin::Pin};
use tokio::time::{timeout, Duration};
@@ -21,8 +22,8 @@ impl BscHandshake {
/// Negotiate the upgrade status message.
pub async fn upgrade_status(
unauth: &mut dyn UnauthEth,
negotiated_status: Status,
) -> Result<Status, EthStreamError> {
negotiated_status: UnifiedStatus,
) -> Result<UnifiedStatus, EthStreamError> {
if negotiated_status.version > EthVersion::Eth66 {
// Send upgrade status message allowing peer to broadcast transactions
let upgrade_msg = UpgradeStatus {
@@ -66,10 +67,10 @@ impl EthRlpxHandshake for BscHandshake {
fn handshake<'a>(
&'a self,
unauth: &'a mut dyn UnauthEth,
status: Status,
status: UnifiedStatus,
fork_filter: ForkFilter,
timeout_limit: Duration,
) -> Pin<Box<dyn Future<Output = Result<Status, EthStreamError>> + 'a + Send>> {
) -> Pin<Box<dyn Future<Output = Result<UnifiedStatus, EthStreamError>> + 'a + Send>> {
Box::pin(async move {
let fut = async {
let negotiated_status =

View File

@@ -19,8 +19,8 @@ use reth_ethereum::{
network::{
config::rng_secret_key,
eth_wire::{
EthMessage, EthStream, HelloMessage, P2PStream, Status, UnauthedEthStream,
UnauthedP2PStream,
EthMessage, EthStream, HelloMessage, P2PStream, UnauthedEthStream, UnauthedP2PStream,
UnifiedStatus,
},
EthNetworkPrimitives,
},
@@ -101,20 +101,24 @@ async fn handshake_p2p(
}
// Perform a ETH Wire handshake with a peer
async fn handshake_eth(p2p_stream: AuthedP2PStream) -> eyre::Result<(AuthedEthStream, Status)> {
async fn handshake_eth(
p2p_stream: AuthedP2PStream,
) -> eyre::Result<(AuthedEthStream, UnifiedStatus)> {
let fork_filter = MAINNET.fork_filter(Head {
timestamp: MAINNET.fork(EthereumHardfork::Shanghai).as_timestamp().unwrap(),
..Default::default()
});
let status = Status::builder()
let unified_status = UnifiedStatus::builder()
.chain(Chain::mainnet())
.genesis(MAINNET_GENESIS_HASH)
.forkid(MAINNET.hardfork_fork_id(EthereumHardfork::Shanghai).unwrap())
.build();
let status =
Status { version: p2p_stream.shared_capabilities().eth()?.version().try_into()?, ..status };
let status = UnifiedStatus {
version: p2p_stream.shared_capabilities().eth()?.version().try_into()?,
..unified_status
};
let eth_unauthed = UnauthedEthStream::new(p2p_stream);
Ok(eth_unauthed.handshake(status, fork_filter).await?)
}