feat: respect BlockRangeInfo when selecting peer for request (#16704)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
gejeduck
2025-11-20 13:19:11 -05:00
committed by GitHub
parent 55dacfc739
commit 9cdcc8e087
3 changed files with 496 additions and 23 deletions

View File

@@ -158,6 +158,15 @@ pub struct Capabilities {
}
impl Capabilities {
/// Create a new instance from the given vec.
pub fn new(value: Vec<Capability>) -> Self {
Self {
eth_66: value.iter().any(Capability::is_eth_v66),
eth_67: value.iter().any(Capability::is_eth_v67),
eth_68: value.iter().any(Capability::is_eth_v68),
inner: value,
}
}
/// Returns all capabilities.
#[inline]
pub fn capabilities(&self) -> &[Capability] {
@@ -197,12 +206,7 @@ impl Capabilities {
impl From<Vec<Capability>> for Capabilities {
fn from(value: Vec<Capability>) -> Self {
Self {
eth_66: value.iter().any(Capability::is_eth_v66),
eth_67: value.iter().any(Capability::is_eth_v67),
eth_68: value.iter().any(Capability::is_eth_v68),
inner: value,
}
Self::new(value)
}
}

View File

@@ -139,8 +139,9 @@ impl<N: NetworkPrimitives> StateFetcher<N> {
/// Returns the _next_ idle peer that's ready to accept a request,
/// prioritizing those with the lowest timeout/latency and those that recently responded with
/// adequate data.
fn next_best_peer(&self) -> Option<PeerId> {
/// adequate data. Additionally, if full blocks are required this prioritizes peers that have
/// full history available
fn next_best_peer(&self, requirement: BestPeerRequirements) -> Option<PeerId> {
let mut idle = self.peers.iter().filter(|(_, peer)| peer.state.is_idle());
let mut best_peer = idle.next()?;
@@ -152,7 +153,13 @@ impl<N: NetworkPrimitives> StateFetcher<N> {
continue
}
// replace best peer if this peer has better rtt
// replace best peer if this peer meets the requirements better
if maybe_better.1.is_better(best_peer.1, &requirement) {
best_peer = maybe_better;
continue
}
// replace best peer if this peer has better rtt and both have same range quality
if maybe_better.1.timeout() < best_peer.1.timeout() &&
!maybe_better.1.last_response_likely_bad
{
@@ -170,9 +177,13 @@ impl<N: NetworkPrimitives> StateFetcher<N> {
return PollAction::NoRequests
}
let Some(peer_id) = self.next_best_peer() else { return PollAction::NoPeersAvailable };
let request = self.queued_requests.pop_front().expect("not empty");
let Some(peer_id) = self.next_best_peer(request.best_peer_requirements()) else {
// need to put back the the request
self.queued_requests.push_front(request);
return PollAction::NoPeersAvailable
};
let request = self.prepare_block_request(peer_id, request);
PollAction::Ready(FetchAction::BlockRequest { peer_id, request })
@@ -358,7 +369,6 @@ struct Peer {
/// lowest timeout.
last_response_likely_bad: bool,
/// Tracks the range info for the peer.
#[allow(dead_code)]
range_info: Option<BlockRangeInfo>,
}
@@ -366,6 +376,74 @@ impl Peer {
fn timeout(&self) -> u64 {
self.timeout.load(Ordering::Relaxed)
}
/// Returns the earliest block number available from the peer.
fn earliest(&self) -> u64 {
self.range_info.as_ref().map_or(0, |info| info.earliest())
}
/// Returns true if the peer has the full history available.
fn has_full_history(&self) -> bool {
self.earliest() == 0
}
fn range(&self) -> Option<RangeInclusive<u64>> {
self.range_info.as_ref().map(|info| info.range())
}
/// Returns true if this peer has a better range than the other peer for serving the requested
/// range.
///
/// A peer has a "better range" if:
/// 1. It can fully cover the requested range while the other cannot
/// 2. None can fully cover the range, but this peer has lower start value
/// 3. If a peer doesnt announce a range we assume it has full history, but check the other's
/// range and treat that as better if it can cover the range
fn has_better_range(&self, other: &Self, range: RangeInclusive<u64>) -> bool {
let self_range = self.range();
let other_range = other.range();
match (self_range, other_range) {
(Some(self_r), Some(other_r)) => {
// Check if each peer can fully cover the requested range
let self_covers = self_r.contains(range.start()) && self_r.contains(range.end());
let other_covers = other_r.contains(range.start()) && other_r.contains(range.end());
#[allow(clippy::match_same_arms)]
match (self_covers, other_covers) {
(true, false) => true, // Only self covers the range
(false, true) => false, // Only other covers the range
(true, true) => false, // Both cover
(false, false) => {
// neither covers - prefer if peer has lower (better) start range
self_r.start() < other_r.start()
}
}
}
(Some(self_r), None) => {
// Self has range info, other doesn't (treated as full history with unknown latest)
// Self is better only if it covers the range
self_r.contains(range.start()) && self_r.contains(range.end())
}
(None, Some(other_r)) => {
// Self has no range info (full history), other has range info
// Self is better only if other doesn't cover the range
!(other_r.contains(range.start()) && other_r.contains(range.end()))
}
(None, None) => false, // Neither has range info - no one is better
}
}
/// Returns true if this peer is better than the other peer based on the given requirements.
fn is_better(&self, other: &Self, requirement: &BestPeerRequirements) -> bool {
match requirement {
BestPeerRequirements::None => false,
BestPeerRequirements::FullBlockRange(range) => {
self.has_better_range(other, range.clone())
}
BestPeerRequirements::FullBlock => self.has_full_history() && !other.has_full_history(),
}
}
}
/// Tracks the state of an individual peer
@@ -427,7 +505,6 @@ pub(crate) enum DownloadRequest<N: NetworkPrimitives> {
request: Vec<B256>,
response: oneshot::Sender<PeerRequestResult<Vec<N::BlockBody>>>,
priority: Priority,
#[allow(dead_code)]
range_hint: Option<RangeInclusive<u64>>,
},
}
@@ -456,6 +533,20 @@ impl<N: NetworkPrimitives> DownloadRequest<N> {
const fn is_normal_priority(&self) -> bool {
self.get_priority().is_normal()
}
/// Returns the best peer requirements for this request.
fn best_peer_requirements(&self) -> BestPeerRequirements {
match self {
Self::GetBlockHeaders { .. } => BestPeerRequirements::None,
Self::GetBlockBodies { range_hint, .. } => {
if let Some(range) = range_hint {
BestPeerRequirements::FullBlockRange(range.clone())
} else {
BestPeerRequirements::FullBlock
}
}
}
}
}
/// An action the syncer can emit.
@@ -480,6 +571,16 @@ pub(crate) enum BlockResponseOutcome {
BadResponse(PeerId, ReputationChangeKind),
}
/// Additional requirements for how to rank peers during selection.
enum BestPeerRequirements {
/// No additional requirements
None,
/// Peer must have this block range available.
FullBlockRange(RangeInclusive<u64>),
/// Peer must have full range.
FullBlock,
}
#[cfg(test)]
mod tests {
use super::*;
@@ -536,17 +637,17 @@ mod tests {
None,
);
let first_peer = fetcher.next_best_peer().unwrap();
let first_peer = fetcher.next_best_peer(BestPeerRequirements::None).unwrap();
assert!(first_peer == peer1 || first_peer == peer2);
// Pending disconnect for first_peer
fetcher.on_pending_disconnect(&first_peer);
// first_peer now isn't idle, so we should get other peer
let second_peer = fetcher.next_best_peer().unwrap();
let second_peer = fetcher.next_best_peer(BestPeerRequirements::None).unwrap();
assert!(first_peer == peer1 || first_peer == peer2);
assert_ne!(first_peer, second_peer);
// without idle peers, returns None
fetcher.on_pending_disconnect(&second_peer);
assert_eq!(fetcher.next_best_peer(), None);
assert_eq!(fetcher.next_best_peer(BestPeerRequirements::None), None);
}
#[tokio::test]
@@ -588,13 +689,13 @@ mod tests {
);
// Must always get peer1 (lowest timeout)
assert_eq!(fetcher.next_best_peer(), Some(peer1));
assert_eq!(fetcher.next_best_peer(), Some(peer1));
assert_eq!(fetcher.next_best_peer(BestPeerRequirements::None), Some(peer1));
assert_eq!(fetcher.next_best_peer(BestPeerRequirements::None), Some(peer1));
// peer2's timeout changes below peer1's
peer2_timeout.store(10, Ordering::Relaxed);
// Then we get peer 2 always (now lowest)
assert_eq!(fetcher.next_best_peer(), Some(peer2));
assert_eq!(fetcher.next_best_peer(), Some(peer2));
assert_eq!(fetcher.next_best_peer(BestPeerRequirements::None), Some(peer2));
assert_eq!(fetcher.next_best_peer(BestPeerRequirements::None), Some(peer2));
}
#[tokio::test]
@@ -684,4 +785,367 @@ mod tests {
assert!(fetcher.peers[&peer_id].state.is_idle());
}
#[test]
fn test_peer_is_better_none_requirement() {
let peer1 = Peer {
state: PeerState::Idle,
best_hash: B256::random(),
best_number: 100,
capabilities: Arc::new(Capabilities::new(vec![])),
timeout: Arc::new(AtomicU64::new(10)),
last_response_likely_bad: false,
range_info: Some(BlockRangeInfo::new(0, 100, B256::random())),
};
let peer2 = Peer {
state: PeerState::Idle,
best_hash: B256::random(),
best_number: 50,
capabilities: Arc::new(Capabilities::new(vec![])),
timeout: Arc::new(AtomicU64::new(20)),
last_response_likely_bad: false,
range_info: None,
};
// With None requirement, is_better should always return false
assert!(!peer1.is_better(&peer2, &BestPeerRequirements::None));
assert!(!peer2.is_better(&peer1, &BestPeerRequirements::None));
}
#[test]
fn test_peer_is_better_full_block_requirement() {
// Peer with full history (earliest = 0)
let peer_full = Peer {
state: PeerState::Idle,
best_hash: B256::random(),
best_number: 100,
capabilities: Arc::new(Capabilities::new(vec![])),
timeout: Arc::new(AtomicU64::new(10)),
last_response_likely_bad: false,
range_info: Some(BlockRangeInfo::new(0, 100, B256::random())),
};
// Peer without full history (earliest = 50)
let peer_partial = Peer {
state: PeerState::Idle,
best_hash: B256::random(),
best_number: 100,
capabilities: Arc::new(Capabilities::new(vec![])),
timeout: Arc::new(AtomicU64::new(10)),
last_response_likely_bad: false,
range_info: Some(BlockRangeInfo::new(50, 100, B256::random())),
};
// Peer without range info (treated as full history)
let peer_no_range = Peer {
state: PeerState::Idle,
best_hash: B256::random(),
best_number: 100,
capabilities: Arc::new(Capabilities::new(vec![])),
timeout: Arc::new(AtomicU64::new(10)),
last_response_likely_bad: false,
range_info: None,
};
// Peer with full history is better than peer without
assert!(peer_full.is_better(&peer_partial, &BestPeerRequirements::FullBlock));
assert!(!peer_partial.is_better(&peer_full, &BestPeerRequirements::FullBlock));
// Peer without range info (full history) is better than partial
assert!(peer_no_range.is_better(&peer_partial, &BestPeerRequirements::FullBlock));
assert!(!peer_partial.is_better(&peer_no_range, &BestPeerRequirements::FullBlock));
// Both have full history - no improvement
assert!(!peer_full.is_better(&peer_no_range, &BestPeerRequirements::FullBlock));
assert!(!peer_no_range.is_better(&peer_full, &BestPeerRequirements::FullBlock));
}
#[test]
fn test_peer_is_better_full_block_range_requirement() {
let range = RangeInclusive::new(40, 60);
// Peer that covers the requested range
let peer_covers = Peer {
state: PeerState::Idle,
best_hash: B256::random(),
best_number: 100,
capabilities: Arc::new(Capabilities::new(vec![])),
timeout: Arc::new(AtomicU64::new(10)),
last_response_likely_bad: false,
range_info: Some(BlockRangeInfo::new(0, 100, B256::random())),
};
// Peer that doesn't cover the range (earliest too high)
let peer_no_cover = Peer {
state: PeerState::Idle,
best_hash: B256::random(),
best_number: 100,
capabilities: Arc::new(Capabilities::new(vec![])),
timeout: Arc::new(AtomicU64::new(10)),
last_response_likely_bad: false,
range_info: Some(BlockRangeInfo::new(70, 100, B256::random())),
};
// Peer that covers the requested range is better than one that doesn't
assert!(peer_covers
.is_better(&peer_no_cover, &BestPeerRequirements::FullBlockRange(range.clone())));
assert!(
!peer_no_cover.is_better(&peer_covers, &BestPeerRequirements::FullBlockRange(range))
);
}
#[test]
fn test_peer_is_better_both_cover_range() {
let range = RangeInclusive::new(30, 50);
// Peer with full history that covers the range
let peer_full = Peer {
state: PeerState::Idle,
best_hash: B256::random(),
best_number: 100,
capabilities: Arc::new(Capabilities::new(vec![])),
timeout: Arc::new(AtomicU64::new(10)),
last_response_likely_bad: false,
range_info: Some(BlockRangeInfo::new(0, 50, B256::random())),
};
// Peer without full history that also covers the range
let peer_partial = Peer {
state: PeerState::Idle,
best_hash: B256::random(),
best_number: 100,
capabilities: Arc::new(Capabilities::new(vec![])),
timeout: Arc::new(AtomicU64::new(10)),
last_response_likely_bad: false,
range_info: Some(BlockRangeInfo::new(30, 50, B256::random())),
};
// When both cover the range, prefer none
assert!(!peer_full
.is_better(&peer_partial, &BestPeerRequirements::FullBlockRange(range.clone())));
assert!(!peer_partial.is_better(&peer_full, &BestPeerRequirements::FullBlockRange(range)));
}
#[test]
fn test_peer_is_better_lower_start() {
let range = RangeInclusive::new(30, 60);
// Peer with full history that covers the range
let peer_full = Peer {
state: PeerState::Idle,
best_hash: B256::random(),
best_number: 100,
capabilities: Arc::new(Capabilities::new(vec![])),
timeout: Arc::new(AtomicU64::new(10)),
last_response_likely_bad: false,
range_info: Some(BlockRangeInfo::new(0, 50, B256::random())),
};
// Peer without full history that also covers the range
let peer_partial = Peer {
state: PeerState::Idle,
best_hash: B256::random(),
best_number: 100,
capabilities: Arc::new(Capabilities::new(vec![])),
timeout: Arc::new(AtomicU64::new(10)),
last_response_likely_bad: false,
range_info: Some(BlockRangeInfo::new(30, 50, B256::random())),
};
// When both cover the range, prefer lower start value
assert!(peer_full
.is_better(&peer_partial, &BestPeerRequirements::FullBlockRange(range.clone())));
assert!(!peer_partial.is_better(&peer_full, &BestPeerRequirements::FullBlockRange(range)));
}
#[test]
fn test_peer_is_better_neither_covers_range() {
let range = RangeInclusive::new(40, 60);
// Peer with full history that doesn't cover the range (latest too low)
let peer_full = Peer {
state: PeerState::Idle,
best_hash: B256::random(),
best_number: 30,
capabilities: Arc::new(Capabilities::new(vec![])),
timeout: Arc::new(AtomicU64::new(10)),
last_response_likely_bad: false,
range_info: Some(BlockRangeInfo::new(0, 30, B256::random())),
};
// Peer without full history that also doesn't cover the range
let peer_partial = Peer {
state: PeerState::Idle,
best_hash: B256::random(),
best_number: 30,
capabilities: Arc::new(Capabilities::new(vec![])),
timeout: Arc::new(AtomicU64::new(10)),
last_response_likely_bad: false,
range_info: Some(BlockRangeInfo::new(10, 30, B256::random())),
};
// When neither covers the range, prefer full history
assert!(peer_full
.is_better(&peer_partial, &BestPeerRequirements::FullBlockRange(range.clone())));
assert!(!peer_partial.is_better(&peer_full, &BestPeerRequirements::FullBlockRange(range)));
}
#[test]
fn test_peer_is_better_no_range_info() {
let range = RangeInclusive::new(40, 60);
// Peer with range info
let peer_with_range = Peer {
state: PeerState::Idle,
best_hash: B256::random(),
best_number: 100,
capabilities: Arc::new(Capabilities::new(vec![])),
timeout: Arc::new(AtomicU64::new(10)),
last_response_likely_bad: false,
range_info: Some(BlockRangeInfo::new(30, 100, B256::random())),
};
// Peer without range info
let peer_no_range = Peer {
state: PeerState::Idle,
best_hash: B256::random(),
best_number: 100,
capabilities: Arc::new(Capabilities::new(vec![])),
timeout: Arc::new(AtomicU64::new(10)),
last_response_likely_bad: false,
range_info: None,
};
// Peer without range info is not better (we prefer peers with known ranges)
assert!(!peer_no_range
.is_better(&peer_with_range, &BestPeerRequirements::FullBlockRange(range.clone())));
// Peer with range info is better than peer without
assert!(
peer_with_range.is_better(&peer_no_range, &BestPeerRequirements::FullBlockRange(range))
);
}
#[test]
fn test_peer_is_better_one_peer_no_range_covers() {
let range = RangeInclusive::new(40, 60);
// Peer with range info that covers the requested range
let peer_with_range_covers = Peer {
state: PeerState::Idle,
best_hash: B256::random(),
best_number: 100,
capabilities: Arc::new(Capabilities::new(vec![])),
timeout: Arc::new(AtomicU64::new(10)),
last_response_likely_bad: false,
range_info: Some(BlockRangeInfo::new(30, 100, B256::random())),
};
// Peer without range info (treated as full history with unknown latest)
let peer_no_range = Peer {
state: PeerState::Idle,
best_hash: B256::random(),
best_number: 100,
capabilities: Arc::new(Capabilities::new(vec![])),
timeout: Arc::new(AtomicU64::new(10)),
last_response_likely_bad: false,
range_info: None,
};
// Peer with range that covers is better than peer without range info
assert!(peer_with_range_covers
.is_better(&peer_no_range, &BestPeerRequirements::FullBlockRange(range.clone())));
// Peer without range info is not better when other covers
assert!(!peer_no_range
.is_better(&peer_with_range_covers, &BestPeerRequirements::FullBlockRange(range)));
}
#[test]
fn test_peer_is_better_one_peer_no_range_doesnt_cover() {
let range = RangeInclusive::new(40, 60);
// Peer with range info that does NOT cover the requested range (too high)
let peer_with_range_no_cover = Peer {
state: PeerState::Idle,
best_hash: B256::random(),
best_number: 100,
capabilities: Arc::new(Capabilities::new(vec![])),
timeout: Arc::new(AtomicU64::new(10)),
last_response_likely_bad: false,
range_info: Some(BlockRangeInfo::new(70, 100, B256::random())),
};
// Peer without range info (treated as full history)
let peer_no_range = Peer {
state: PeerState::Idle,
best_hash: B256::random(),
best_number: 100,
capabilities: Arc::new(Capabilities::new(vec![])),
timeout: Arc::new(AtomicU64::new(10)),
last_response_likely_bad: false,
range_info: None,
};
// Peer with range that doesn't cover is not better
assert!(!peer_with_range_no_cover
.is_better(&peer_no_range, &BestPeerRequirements::FullBlockRange(range.clone())));
// Peer without range info (full history) is better when other doesn't cover
assert!(peer_no_range
.is_better(&peer_with_range_no_cover, &BestPeerRequirements::FullBlockRange(range)));
}
#[test]
fn test_peer_is_better_edge_cases() {
// Test exact range boundaries
let range = RangeInclusive::new(50, 100);
// Peer that exactly covers the range
let peer_exact = Peer {
state: PeerState::Idle,
best_hash: B256::random(),
best_number: 100,
capabilities: Arc::new(Capabilities::new(vec![])),
timeout: Arc::new(AtomicU64::new(10)),
last_response_likely_bad: false,
range_info: Some(BlockRangeInfo::new(50, 100, B256::random())),
};
// Peer that's one block short at the start
let peer_short_start = Peer {
state: PeerState::Idle,
best_hash: B256::random(),
best_number: 100,
capabilities: Arc::new(Capabilities::new(vec![])),
timeout: Arc::new(AtomicU64::new(10)),
last_response_likely_bad: false,
range_info: Some(BlockRangeInfo::new(51, 100, B256::random())),
};
// Peer that's one block short at the end
let peer_short_end = Peer {
state: PeerState::Idle,
best_hash: B256::random(),
best_number: 100,
capabilities: Arc::new(Capabilities::new(vec![])),
timeout: Arc::new(AtomicU64::new(10)),
last_response_likely_bad: false,
range_info: Some(BlockRangeInfo::new(50, 99, B256::random())),
};
// Exact coverage is better than short coverage
assert!(peer_exact
.is_better(&peer_short_start, &BestPeerRequirements::FullBlockRange(range.clone())));
assert!(peer_exact
.is_better(&peer_short_end, &BestPeerRequirements::FullBlockRange(range.clone())));
// Short coverage is not better than exact coverage
assert!(!peer_short_start
.is_better(&peer_exact, &BestPeerRequirements::FullBlockRange(range.clone())));
assert!(
!peer_short_end.is_better(&peer_exact, &BestPeerRequirements::FullBlockRange(range))
);
}
}

View File

@@ -11,7 +11,7 @@ use std::{
},
};
/// Information about the range of blocks available from a peer.
/// Information about the range of full blocks available from a peer.
///
/// This represents the announced `eth69`
/// [`BlockRangeUpdate`] of a peer.
@@ -45,12 +45,12 @@ impl BlockRangeInfo {
RangeInclusive::new(earliest, latest)
}
/// Returns the earliest block number available from the peer.
/// Returns the earliest full block number available from the peer.
pub fn earliest(&self) -> u64 {
self.inner.earliest.load(Ordering::Relaxed)
}
/// Returns the latest block number available from the peer.
/// Returns the latest full block number available from the peer.
pub fn latest(&self) -> u64 {
self.inner.latest.load(Ordering::Relaxed)
}
@@ -60,6 +60,11 @@ impl BlockRangeInfo {
*self.inner.latest_hash.read()
}
/// Returns true if the peer has the full history available.
pub fn has_full_history(&self) -> bool {
self.earliest() == 0
}
/// Updates the range information.
pub fn update(&self, earliest: u64, latest: u64, latest_hash: B256) {
self.inner.earliest.store(earliest, Ordering::Relaxed);