mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-10 13:58:09 -05:00
Compare commits
4 Commits
migrate-rp
...
refactor-p
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9993547716 | ||
|
|
feffe239d3 | ||
|
|
f3430693c6 | ||
|
|
1b99283fb3 |
@@ -37,6 +37,7 @@ The format is based on Keep a Changelog, and this project adheres to Semantic Ve
|
||||
- Updated k8s-io/client-go to v0.30.4 and k8s-io/apimachinery to v0.30.4
|
||||
- Migrated tracing library from opencensus to opentelemetry for both the beacon node and validator.
|
||||
- Refactored light client code to make it more readable and make future PRs easier.
|
||||
- P2P: Refactored functions managing the peers to prune.
|
||||
|
||||
### Deprecated
|
||||
- `--disable-grpc-gateway` flag is deprecated due to grpc gateway removal.
|
||||
|
||||
@@ -792,46 +792,61 @@ func (p *Status) BestNonFinalized(minPeers int, ourHeadEpoch primitives.Epoch) (
|
||||
// bad response count. In the future scoring will be used
|
||||
// to determine the most suitable peers to take out.
|
||||
func (p *Status) PeersToPrune() []peer.ID {
|
||||
if !features.Get().EnablePeerScorer {
|
||||
return p.deprecatedPeersToPrune()
|
||||
}
|
||||
connLimit := p.ConnectedPeerLimit()
|
||||
inBoundLimit := uint64(p.InboundLimit())
|
||||
activePeers := p.Active()
|
||||
numInboundPeers := uint64(len(p.InboundConnected()))
|
||||
// Exit early if we are still below our max
|
||||
// limit.
|
||||
if uint64(len(activePeers)) <= connLimit {
|
||||
activePeerCount := uint64(len(p.Active()))
|
||||
inboundPeerCount := uint64(len(p.InboundConnected()))
|
||||
|
||||
// Exit early if we are still below our max limit.
|
||||
if activePeerCount <= connLimit {
|
||||
return []peer.ID{}
|
||||
}
|
||||
|
||||
p.store.Lock()
|
||||
defer p.store.Unlock()
|
||||
|
||||
type peerResp struct {
|
||||
pid peer.ID
|
||||
score float64
|
||||
pid peer.ID
|
||||
score float64
|
||||
badResp int
|
||||
}
|
||||
|
||||
peersToPrune := make([]*peerResp, 0)
|
||||
// Select connected and inbound peers to prune.
|
||||
// A peer is a candidate for pruning if:
|
||||
// - it is connected, and
|
||||
// - it is an inbound peer, and
|
||||
// - it is not a trusted peer
|
||||
for pid, peerData := range p.store.Peers() {
|
||||
if peerData.ConnState == PeerConnected &&
|
||||
peerData.Direction == network.DirInbound && !p.store.IsTrustedPeer(pid) {
|
||||
peersToPrune = append(peersToPrune, &peerResp{
|
||||
pid: pid,
|
||||
score: p.scorers.ScoreNoLock(pid),
|
||||
})
|
||||
isConnected := peerData.ConnState == PeerConnected
|
||||
isInbound := peerData.Direction == network.DirInbound
|
||||
isTrusted := p.store.IsTrustedPeer(pid)
|
||||
|
||||
if isInbound && isConnected && !isTrusted {
|
||||
peerToPrune := &peerResp{
|
||||
pid: pid,
|
||||
score: p.scorers.ScoreNoLock(pid),
|
||||
badResp: peerData.BadResponses,
|
||||
}
|
||||
|
||||
peersToPrune = append(peersToPrune, peerToPrune)
|
||||
}
|
||||
}
|
||||
|
||||
// Sort in ascending order to favour pruning peers with a
|
||||
// lower score.
|
||||
sort.Slice(peersToPrune, func(i, j int) bool {
|
||||
return peersToPrune[i].score < peersToPrune[j].score
|
||||
})
|
||||
sortFunc := func(i, j int) bool {
|
||||
return peersToPrune[i].badResp > peersToPrune[j].badResp
|
||||
}
|
||||
|
||||
// Determine amount of peers to prune using our
|
||||
// max connection limit.
|
||||
amountToPrune, err := pmath.Sub64(uint64(len(activePeers)), connLimit)
|
||||
if features.Get().EnablePeerScorer {
|
||||
sortFunc = func(i, j int) bool {
|
||||
return peersToPrune[i].score < peersToPrune[j].score
|
||||
}
|
||||
}
|
||||
|
||||
// Sort in ascending order to favour pruning peers with a lower score.
|
||||
sort.Slice(peersToPrune, sortFunc)
|
||||
|
||||
// Determine amount of peers to prune using our max connection limit.
|
||||
amountToPrune, err := pmath.Sub64(activePeerCount, connLimit)
|
||||
if err != nil {
|
||||
// This should never happen.
|
||||
log.WithError(err).Error("Failed to determine amount of peers to prune")
|
||||
@@ -840,86 +855,24 @@ func (p *Status) PeersToPrune() []peer.ID {
|
||||
|
||||
// Also check for inbound peers above our limit.
|
||||
excessInbound := uint64(0)
|
||||
if numInboundPeers > inBoundLimit {
|
||||
excessInbound = numInboundPeers - inBoundLimit
|
||||
if inboundPeerCount > inBoundLimit {
|
||||
excessInbound = inboundPeerCount - inBoundLimit
|
||||
}
|
||||
// Prune the largest amount between excess peers and
|
||||
// excess inbound peers.
|
||||
|
||||
// Prune the largest amount between excess peers and excess inbound peers.
|
||||
if excessInbound > amountToPrune {
|
||||
amountToPrune = excessInbound
|
||||
}
|
||||
|
||||
if amountToPrune < uint64(len(peersToPrune)) {
|
||||
peersToPrune = peersToPrune[:amountToPrune]
|
||||
}
|
||||
|
||||
ids := make([]peer.ID, 0, len(peersToPrune))
|
||||
for _, pr := range peersToPrune {
|
||||
ids = append(ids, pr.pid)
|
||||
}
|
||||
return ids
|
||||
}
|
||||
|
||||
// Deprecated: Is used to represent the older method
|
||||
// of pruning which utilized bad response counts.
|
||||
func (p *Status) deprecatedPeersToPrune() []peer.ID {
|
||||
connLimit := p.ConnectedPeerLimit()
|
||||
inBoundLimit := p.InboundLimit()
|
||||
activePeers := p.Active()
|
||||
numInboundPeers := len(p.InboundConnected())
|
||||
// Exit early if we are still below our max
|
||||
// limit.
|
||||
if uint64(len(activePeers)) <= connLimit {
|
||||
return []peer.ID{}
|
||||
}
|
||||
p.store.Lock()
|
||||
defer p.store.Unlock()
|
||||
|
||||
type peerResp struct {
|
||||
pid peer.ID
|
||||
badResp int
|
||||
}
|
||||
peersToPrune := make([]*peerResp, 0)
|
||||
// Select connected and inbound peers to prune.
|
||||
for pid, peerData := range p.store.Peers() {
|
||||
if peerData.ConnState == PeerConnected &&
|
||||
peerData.Direction == network.DirInbound && !p.store.IsTrustedPeer(pid) {
|
||||
peersToPrune = append(peersToPrune, &peerResp{
|
||||
pid: pid,
|
||||
badResp: peerData.BadResponses,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Sort in descending order to favour pruning peers with a
|
||||
// higher bad response count.
|
||||
sort.Slice(peersToPrune, func(i, j int) bool {
|
||||
return peersToPrune[i].badResp > peersToPrune[j].badResp
|
||||
})
|
||||
|
||||
// Determine amount of peers to prune using our
|
||||
// max connection limit.
|
||||
amountToPrune, err := pmath.Sub64(uint64(len(activePeers)), connLimit)
|
||||
if err != nil {
|
||||
// This should never happen
|
||||
log.WithError(err).Error("Failed to determine amount of peers to prune")
|
||||
return []peer.ID{}
|
||||
}
|
||||
// Also check for inbound peers above our limit.
|
||||
excessInbound := uint64(0)
|
||||
if numInboundPeers > inBoundLimit {
|
||||
excessInbound = uint64(numInboundPeers - inBoundLimit)
|
||||
}
|
||||
// Prune the largest amount between excess peers and
|
||||
// excess inbound peers.
|
||||
if excessInbound > amountToPrune {
|
||||
amountToPrune = excessInbound
|
||||
}
|
||||
if amountToPrune < uint64(len(peersToPrune)) {
|
||||
peersToPrune = peersToPrune[:amountToPrune]
|
||||
}
|
||||
ids := make([]peer.ID, 0, len(peersToPrune))
|
||||
for _, pr := range peersToPrune {
|
||||
ids = append(ids, pr.pid)
|
||||
}
|
||||
return ids
|
||||
}
|
||||
|
||||
|
||||
@@ -28,34 +28,44 @@ import (
|
||||
// maintainPeerStatuses by infrequently polling peers for their latest status.
|
||||
func (s *Service) maintainPeerStatuses() {
|
||||
// Run twice per epoch.
|
||||
interval := time.Duration(params.BeaconConfig().SlotsPerEpoch.Div(2).Mul(params.BeaconConfig().SecondsPerSlot)) * time.Second
|
||||
slotsPerEpoch := params.BeaconConfig().SlotsPerEpoch
|
||||
secondsPerSlot := params.BeaconConfig().SecondsPerSlot
|
||||
interval := time.Duration(slotsPerEpoch.Div(2).Mul(secondsPerSlot)) * time.Second
|
||||
|
||||
async.RunEvery(s.ctx, interval, func() {
|
||||
wg := new(sync.WaitGroup)
|
||||
|
||||
for _, pid := range s.cfg.p2p.Peers().Connected() {
|
||||
wg.Add(1)
|
||||
|
||||
go func(id peer.ID) {
|
||||
defer wg.Done()
|
||||
// If our peer status has not been updated correctly we disconnect over here
|
||||
// and set the connection state over here instead.
|
||||
if s.cfg.p2p.Host().Network().Connectedness(id) != network.Connected {
|
||||
isNotConnected := s.cfg.p2p.Host().Network().Connectedness(id) != network.Connected
|
||||
if isNotConnected {
|
||||
s.cfg.p2p.Peers().SetConnectionState(id, peers.PeerDisconnecting)
|
||||
if err := s.cfg.p2p.Disconnect(id); err != nil {
|
||||
log.WithError(err).Debug("Error when disconnecting with peer")
|
||||
}
|
||||
|
||||
s.cfg.p2p.Peers().SetConnectionState(id, peers.PeerDisconnected)
|
||||
return
|
||||
}
|
||||
|
||||
// Disconnect from peers that are considered bad by any of the registered scorers.
|
||||
if s.cfg.p2p.Peers().IsBad(id) {
|
||||
s.disconnectBadPeer(s.ctx, id)
|
||||
return
|
||||
}
|
||||
|
||||
// If the status hasn't been updated in the recent interval time.
|
||||
lastUpdated, err := s.cfg.p2p.Peers().ChainStateLastUpdated(id)
|
||||
if err != nil {
|
||||
// Peer has vanished; nothing to do.
|
||||
return
|
||||
}
|
||||
|
||||
if prysmTime.Now().After(lastUpdated.Add(interval)) {
|
||||
if err := s.reValidatePeer(s.ctx, id); err != nil {
|
||||
log.WithField("peer", id).WithError(err).Debug("Could not revalidate peer")
|
||||
@@ -64,9 +74,11 @@ func (s *Service) maintainPeerStatuses() {
|
||||
}
|
||||
}(pid)
|
||||
}
|
||||
// Wait for all status checks to finish and then proceed onwards to
|
||||
// pruning excess peers.
|
||||
|
||||
// Wait for all status checks to finish.
|
||||
wg.Wait()
|
||||
|
||||
// Proceed onwards to pruning excess peers.
|
||||
peerIds := s.cfg.p2p.Peers().PeersToPrune()
|
||||
peerIds = s.filterNeededPeers(peerIds)
|
||||
for _, id := range peerIds {
|
||||
|
||||
Reference in New Issue
Block a user