Compare commits

...

4 Commits

Author SHA1 Message Date
Manu NALEPA
9993547716 Modify changelog. 2024-09-19 09:43:25 +02:00
Manu NALEPA
feffe239d3 maintainPeerStatuses: Refactor a little bit. 2024-09-19 09:41:02 +02:00
Manu NALEPA
f3430693c6 PeersToPrune: Add comments. 2024-09-19 09:40:40 +02:00
Manu NALEPA
1b99283fb3 PeersToPrune: Refactor.
The only difference between `PeersToPrune` and `deprecatedPeersToPrune` is the way peers to prune are sorted.

All the rest of the code is a duplicate.

This commit removes all code duplication.
2024-09-19 09:40:37 +02:00
3 changed files with 63 additions and 97 deletions

View File

@@ -37,6 +37,7 @@ The format is based on Keep a Changelog, and this project adheres to Semantic Ve
- Updated k8s-io/client-go to v0.30.4 and k8s-io/apimachinery to v0.30.4
- Migrated tracing library from opencensus to opentelemetry for both the beacon node and validator.
- Refactored light client code to make it more readable and make future PRs easier.
- P2P: Refactored functions managing the peers to prune.
### Deprecated
- `--disable-grpc-gateway` flag is deprecated due to grpc gateway removal.

View File

@@ -792,46 +792,61 @@ func (p *Status) BestNonFinalized(minPeers int, ourHeadEpoch primitives.Epoch) (
// bad response count. In the future scoring will be used
// to determine the most suitable peers to take out.
func (p *Status) PeersToPrune() []peer.ID {
if !features.Get().EnablePeerScorer {
return p.deprecatedPeersToPrune()
}
connLimit := p.ConnectedPeerLimit()
inBoundLimit := uint64(p.InboundLimit())
activePeers := p.Active()
numInboundPeers := uint64(len(p.InboundConnected()))
// Exit early if we are still below our max
// limit.
if uint64(len(activePeers)) <= connLimit {
activePeerCount := uint64(len(p.Active()))
inboundPeerCount := uint64(len(p.InboundConnected()))
// Exit early if we are still below our max limit.
if activePeerCount <= connLimit {
return []peer.ID{}
}
p.store.Lock()
defer p.store.Unlock()
type peerResp struct {
pid peer.ID
score float64
pid peer.ID
score float64
badResp int
}
peersToPrune := make([]*peerResp, 0)
// Select connected and inbound peers to prune.
// A peer is a candidate for pruning if:
// - it is connected, and
// - it is an inbound peer, and
// - it is not a trusted peer
for pid, peerData := range p.store.Peers() {
if peerData.ConnState == PeerConnected &&
peerData.Direction == network.DirInbound && !p.store.IsTrustedPeer(pid) {
peersToPrune = append(peersToPrune, &peerResp{
pid: pid,
score: p.scorers.ScoreNoLock(pid),
})
isConnected := peerData.ConnState == PeerConnected
isInbound := peerData.Direction == network.DirInbound
isTrusted := p.store.IsTrustedPeer(pid)
if isInbound && isConnected && !isTrusted {
peerToPrune := &peerResp{
pid: pid,
score: p.scorers.ScoreNoLock(pid),
badResp: peerData.BadResponses,
}
peersToPrune = append(peersToPrune, peerToPrune)
}
}
// Sort in ascending order to favour pruning peers with a
// lower score.
sort.Slice(peersToPrune, func(i, j int) bool {
return peersToPrune[i].score < peersToPrune[j].score
})
sortFunc := func(i, j int) bool {
return peersToPrune[i].badResp > peersToPrune[j].badResp
}
// Determine amount of peers to prune using our
// max connection limit.
amountToPrune, err := pmath.Sub64(uint64(len(activePeers)), connLimit)
if features.Get().EnablePeerScorer {
sortFunc = func(i, j int) bool {
return peersToPrune[i].score < peersToPrune[j].score
}
}
// Sort in ascending order to favour pruning peers with a lower score.
sort.Slice(peersToPrune, sortFunc)
// Determine amount of peers to prune using our max connection limit.
amountToPrune, err := pmath.Sub64(activePeerCount, connLimit)
if err != nil {
// This should never happen.
log.WithError(err).Error("Failed to determine amount of peers to prune")
@@ -840,86 +855,24 @@ func (p *Status) PeersToPrune() []peer.ID {
// Also check for inbound peers above our limit.
excessInbound := uint64(0)
if numInboundPeers > inBoundLimit {
excessInbound = numInboundPeers - inBoundLimit
if inboundPeerCount > inBoundLimit {
excessInbound = inboundPeerCount - inBoundLimit
}
// Prune the largest amount between excess peers and
// excess inbound peers.
// Prune the largest amount between excess peers and excess inbound peers.
if excessInbound > amountToPrune {
amountToPrune = excessInbound
}
if amountToPrune < uint64(len(peersToPrune)) {
peersToPrune = peersToPrune[:amountToPrune]
}
ids := make([]peer.ID, 0, len(peersToPrune))
for _, pr := range peersToPrune {
ids = append(ids, pr.pid)
}
return ids
}
// Deprecated: Is used to represent the older method
// of pruning which utilized bad response counts.
func (p *Status) deprecatedPeersToPrune() []peer.ID {
connLimit := p.ConnectedPeerLimit()
inBoundLimit := p.InboundLimit()
activePeers := p.Active()
numInboundPeers := len(p.InboundConnected())
// Exit early if we are still below our max
// limit.
if uint64(len(activePeers)) <= connLimit {
return []peer.ID{}
}
p.store.Lock()
defer p.store.Unlock()
type peerResp struct {
pid peer.ID
badResp int
}
peersToPrune := make([]*peerResp, 0)
// Select connected and inbound peers to prune.
for pid, peerData := range p.store.Peers() {
if peerData.ConnState == PeerConnected &&
peerData.Direction == network.DirInbound && !p.store.IsTrustedPeer(pid) {
peersToPrune = append(peersToPrune, &peerResp{
pid: pid,
badResp: peerData.BadResponses,
})
}
}
// Sort in descending order to favour pruning peers with a
// higher bad response count.
sort.Slice(peersToPrune, func(i, j int) bool {
return peersToPrune[i].badResp > peersToPrune[j].badResp
})
// Determine amount of peers to prune using our
// max connection limit.
amountToPrune, err := pmath.Sub64(uint64(len(activePeers)), connLimit)
if err != nil {
// This should never happen
log.WithError(err).Error("Failed to determine amount of peers to prune")
return []peer.ID{}
}
// Also check for inbound peers above our limit.
excessInbound := uint64(0)
if numInboundPeers > inBoundLimit {
excessInbound = uint64(numInboundPeers - inBoundLimit)
}
// Prune the largest amount between excess peers and
// excess inbound peers.
if excessInbound > amountToPrune {
amountToPrune = excessInbound
}
if amountToPrune < uint64(len(peersToPrune)) {
peersToPrune = peersToPrune[:amountToPrune]
}
ids := make([]peer.ID, 0, len(peersToPrune))
for _, pr := range peersToPrune {
ids = append(ids, pr.pid)
}
return ids
}

View File

@@ -28,34 +28,44 @@ import (
// maintainPeerStatuses by infrequently polling peers for their latest status.
func (s *Service) maintainPeerStatuses() {
// Run twice per epoch.
interval := time.Duration(params.BeaconConfig().SlotsPerEpoch.Div(2).Mul(params.BeaconConfig().SecondsPerSlot)) * time.Second
slotsPerEpoch := params.BeaconConfig().SlotsPerEpoch
secondsPerSlot := params.BeaconConfig().SecondsPerSlot
interval := time.Duration(slotsPerEpoch.Div(2).Mul(secondsPerSlot)) * time.Second
async.RunEvery(s.ctx, interval, func() {
wg := new(sync.WaitGroup)
for _, pid := range s.cfg.p2p.Peers().Connected() {
wg.Add(1)
go func(id peer.ID) {
defer wg.Done()
// If our peer status has not been updated correctly we disconnect over here
// and set the connection state over here instead.
if s.cfg.p2p.Host().Network().Connectedness(id) != network.Connected {
isNotConnected := s.cfg.p2p.Host().Network().Connectedness(id) != network.Connected
if isNotConnected {
s.cfg.p2p.Peers().SetConnectionState(id, peers.PeerDisconnecting)
if err := s.cfg.p2p.Disconnect(id); err != nil {
log.WithError(err).Debug("Error when disconnecting with peer")
}
s.cfg.p2p.Peers().SetConnectionState(id, peers.PeerDisconnected)
return
}
// Disconnect from peers that are considered bad by any of the registered scorers.
if s.cfg.p2p.Peers().IsBad(id) {
s.disconnectBadPeer(s.ctx, id)
return
}
// If the status hasn't been updated in the recent interval time.
lastUpdated, err := s.cfg.p2p.Peers().ChainStateLastUpdated(id)
if err != nil {
// Peer has vanished; nothing to do.
return
}
if prysmTime.Now().After(lastUpdated.Add(interval)) {
if err := s.reValidatePeer(s.ctx, id); err != nil {
log.WithField("peer", id).WithError(err).Debug("Could not revalidate peer")
@@ -64,9 +74,11 @@ func (s *Service) maintainPeerStatuses() {
}
}(pid)
}
// Wait for all status checks to finish and then proceed onwards to
// pruning excess peers.
// Wait for all status checks to finish.
wg.Wait()
// Proceed onwards to pruning excess peers.
peerIds := s.cfg.p2p.Peers().PeersToPrune()
peerIds = s.filterNeededPeers(peerIds)
for _, id := range peerIds {