From ce397ce797c33dbcf77fa7670c356844ef6aad43 Mon Sep 17 00:00:00 2001 From: Nishant Das Date: Fri, 15 Jan 2021 05:28:20 +0800 Subject: [PATCH] Prune Excess Peers Better (#8260) * add method * add changes * formatting * choose * fix waitgroup * add * add debug logs * gaz * make it better * fix * godoc Co-authored-by: Raul Jordan Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com> --- beacon-chain/p2p/peers/status.go | 51 ++++++++++++++++++++++ beacon-chain/p2p/peers/status_test.go | 61 +++++++++++++++++++++++++++ beacon-chain/sync/rpc_status.go | 13 ++++++ 3 files changed, 125 insertions(+) diff --git a/beacon-chain/p2p/peers/status.go b/beacon-chain/p2p/peers/status.go index 0bc15a528a..63ebd286c0 100644 --- a/beacon-chain/p2p/peers/status.go +++ b/beacon-chain/p2p/peers/status.go @@ -614,6 +614,57 @@ func (p *Status) BestNonFinalized(minPeers int, ourHeadEpoch uint64) (uint64, [] return targetEpoch, potentialPIDs } +// PeersToPrune selects the most sutiable inbound peers +// to disconnect the host peer from. As of this moment +// the pruning relies on simple heuristics such as +// bad response count. In the future scoring will be used +// to determine the most suitable peers to take out. +func (p *Status) PeersToPrune() []peer.ID { + connLimit := p.ConnectedPeerLimit() + activePeers := p.Active() + // Exit early if we are still below our max + // limit. + if len(activePeers) <= int(connLimit) { + return []peer.ID{} + } + p.store.Lock() + defer p.store.Unlock() + + type peerResp struct { + pid peer.ID + badResp int + } + peersToPrune := make([]*peerResp, 0) + // Select disconnected peers with a smaller bad response count. + for pid, peerData := range p.store.Peers() { + if peerData.ConnState == PeerConnected && peerData.Direction == network.DirInbound { + peersToPrune = append(peersToPrune, &peerResp{ + pid: pid, + badResp: peerData.BadResponses, + }) + } + } + + // Sort in descending order to favour pruning peers with a + // higher bad response count. + sort.Slice(peersToPrune, func(i, j int) bool { + return peersToPrune[i].badResp > peersToPrune[j].badResp + }) + + // Determine amount of peers to prune using our + // max connection limit. + amountToPrune := len(activePeers) - int(connLimit) + + if amountToPrune < len(peersToPrune) { + peersToPrune = peersToPrune[:amountToPrune] + } + ids := make([]peer.ID, 0, len(peersToPrune)) + for _, pr := range peersToPrune { + ids = append(ids, pr.pid) + } + return ids +} + // HighestEpoch returns the highest epoch reported epoch amongst peers. func (p *Status) HighestEpoch() uint64 { p.store.RLock() diff --git a/beacon-chain/p2p/peers/status_test.go b/beacon-chain/p2p/peers/status_test.go index 19c6fa85ba..f8240efc51 100644 --- a/beacon-chain/p2p/peers/status_test.go +++ b/beacon-chain/p2p/peers/status_test.go @@ -657,6 +657,67 @@ func TestAtInboundPeerLimit(t *testing.T) { assert.Equal(t, true, p.IsAboveInboundLimit(), "Inbound limit not exceeded") } +func TestPrunePeers(t *testing.T) { + p := peers.NewStatus(context.Background(), &peers.StatusConfig{ + PeerLimit: 30, + ScorerParams: &scorers.Config{ + BadResponsesScorerConfig: &scorers.BadResponsesScorerConfig{ + Threshold: 1, + }, + }, + }) + for i := 0; i < 15; i++ { + // Peer added to peer handler. + createPeer(t, p, nil, network.DirOutbound, peerdata.PeerConnectionState(ethpb.ConnectionState_CONNECTED)) + } + // Assert there are no prunable peers. + peersToPrune := p.PeersToPrune() + assert.Equal(t, 0, len(peersToPrune)) + + for i := 0; i < 18; i++ { + // Peer added to peer handler. + createPeer(t, p, nil, network.DirInbound, peerdata.PeerConnectionState(ethpb.ConnectionState_CONNECTED)) + } + + // Assert there are the correct prunable peers. + peersToPrune = p.PeersToPrune() + assert.Equal(t, 3, len(peersToPrune)) + + // Add in more peers. + for i := 0; i < 13; i++ { + // Peer added to peer handler. + createPeer(t, p, nil, network.DirInbound, peerdata.PeerConnectionState(ethpb.ConnectionState_CONNECTED)) + } + + // Set up bad scores for inbound peers. + inboundPeers := p.Inbound() + for i, pid := range inboundPeers { + modulo := i % 5 + // Increment bad scores for peers. + for j := 0; j < modulo; j++ { + p.Scorers().BadResponsesScorer().Increment(pid) + } + } + // Assert all peers more than max are prunable. + peersToPrune = p.PeersToPrune() + assert.Equal(t, 16, len(peersToPrune)) + for _, pid := range peersToPrune { + dir, err := p.Direction(pid) + require.NoError(t, err) + assert.Equal(t, network.DirInbound, dir) + } + + // Ensure it is in the descending order. + currCount, err := p.Scorers().BadResponsesScorer().Count(peersToPrune[0]) + require.NoError(t, err) + for _, pid := range peersToPrune { + count, err := p.Scorers().BadResponsesScorer().Count(pid) + require.NoError(t, err) + assert.Equal(t, true, currCount >= count) + currCount = count + } +} + func TestStatus_BestPeer(t *testing.T) { type peerConfig struct { headSlot uint64 diff --git a/beacon-chain/sync/rpc_status.go b/beacon-chain/sync/rpc_status.go index 787d09c230..fa2484725e 100644 --- a/beacon-chain/sync/rpc_status.go +++ b/beacon-chain/sync/rpc_status.go @@ -3,6 +3,7 @@ package sync import ( "bytes" "context" + "sync" "time" libp2pcore "github.com/libp2p/go-libp2p-core" @@ -28,8 +29,11 @@ func (s *Service) maintainPeerStatuses() { // Run twice per epoch. interval := time.Duration(params.BeaconConfig().SecondsPerSlot*params.BeaconConfig().SlotsPerEpoch/2) * time.Second runutil.RunEvery(s.ctx, interval, func() { + wg := new(sync.WaitGroup) for _, pid := range s.p2p.Peers().Connected() { + wg.Add(1) go func(id peer.ID) { + defer wg.Done() // If our peer status has not been updated correctly we disconnect over here // and set the connection state over here instead. if s.p2p.Host().Network().Connectedness(id) != network.Connected { @@ -59,6 +63,15 @@ func (s *Service) maintainPeerStatuses() { } }(pid) } + // Wait for all status checks to finish and then proceed onwards to + // pruning excess peers. + wg.Wait() + peerIds := s.p2p.Peers().PeersToPrune() + for _, id := range peerIds { + if err := s.sendGoodByeAndDisconnect(s.ctx, p2ptypes.GoodbyeCodeTooManyPeers, id); err != nil { + log.WithField("peer", id).WithError(err).Debug("Could not disconnect with peer") + } + } }) }