Add Better Peer Pruning (#8501)

* add better pruning

* add test

* gaz

* victor's review

Co-authored-by: Raul Jordan <raul@prysmaticlabs.com>
Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
This commit is contained in:
Nishant Das
2021-02-25 02:40:06 +08:00
committed by GitHub
parent 4d28d5e4d2
commit 0625a6906c
6 changed files with 172 additions and 9 deletions

View File

@@ -133,3 +133,22 @@ func (s *subnetIDs) AddPersistentCommittee(pubkey []byte, comIndex []uint64, dur
s.persistentSubnets.Set(string(pubkey), comIndex, duration) s.persistentSubnets.Set(string(pubkey), comIndex, duration)
} }
// EmptyAllCaches empties out all the related caches and flushes any stored
// entries on them. This should only ever be used for testing, in normal
// production, handling of the relevant subnets for each role is done
// separately.
func (s *subnetIDs) EmptyAllCaches() {
// Clear the caches.
s.attesterLock.Lock()
s.attester.Purge()
s.attesterLock.Unlock()
s.aggregatorLock.Lock()
s.aggregator.Purge()
s.aggregatorLock.Unlock()
s.subnetsLock.Lock()
s.persistentSubnets.Flush()
s.subnetsLock.Unlock()
}

View File

@@ -656,7 +656,9 @@ func (p *Status) BestNonFinalized(minPeers int, ourHeadEpoch types.Epoch) (types
// to determine the most suitable peers to take out. // to determine the most suitable peers to take out.
func (p *Status) PeersToPrune() []peer.ID { func (p *Status) PeersToPrune() []peer.ID {
connLimit := p.ConnectedPeerLimit() connLimit := p.ConnectedPeerLimit()
inBoundLimit := p.InboundLimit()
activePeers := p.Active() activePeers := p.Active()
numInboundPeers := len(p.InboundConnected())
// Exit early if we are still below our max // Exit early if we are still below our max
// limit. // limit.
if len(activePeers) <= int(connLimit) { if len(activePeers) <= int(connLimit) {
@@ -672,7 +674,8 @@ func (p *Status) PeersToPrune() []peer.ID {
peersToPrune := make([]*peerResp, 0) peersToPrune := make([]*peerResp, 0)
// Select connected and inbound peers to prune. // Select connected and inbound peers to prune.
for pid, peerData := range p.store.Peers() { for pid, peerData := range p.store.Peers() {
if peerData.ConnState == PeerConnected && peerData.Direction == network.DirInbound { if peerData.ConnState == PeerConnected &&
peerData.Direction == network.DirInbound {
peersToPrune = append(peersToPrune, &peerResp{ peersToPrune = append(peersToPrune, &peerResp{
pid: pid, pid: pid,
badResp: peerData.BadResponses, badResp: peerData.BadResponses,
@@ -690,6 +693,16 @@ func (p *Status) PeersToPrune() []peer.ID {
// max connection limit. // max connection limit.
amountToPrune := len(activePeers) - int(connLimit) amountToPrune := len(activePeers) - int(connLimit)
// Also check for inbound peers above our limit.
excessInbound := 0
if numInboundPeers > inBoundLimit {
excessInbound = numInboundPeers - inBoundLimit
}
// Prune the largest amount between excess peers and
// excess inbound peers.
if excessInbound > amountToPrune {
amountToPrune = excessInbound
}
if amountToPrune < len(peersToPrune) { if amountToPrune < len(peersToPrune) {
peersToPrune = peersToPrune[:amountToPrune] peersToPrune = peersToPrune[:amountToPrune]
} }

View File

@@ -136,6 +136,7 @@ go_test(
shard_count = 4, shard_count = 4,
deps = [ deps = [
"//beacon-chain/blockchain/testing:go_default_library", "//beacon-chain/blockchain/testing:go_default_library",
"//beacon-chain/cache:go_default_library",
"//beacon-chain/core/feed:go_default_library", "//beacon-chain/core/feed:go_default_library",
"//beacon-chain/core/feed/state:go_default_library", "//beacon-chain/core/feed/state:go_default_library",
"//beacon-chain/core/helpers:go_default_library", "//beacon-chain/core/helpers:go_default_library",

View File

@@ -68,6 +68,7 @@ func (s *Service) maintainPeerStatuses() {
// pruning excess peers. // pruning excess peers.
wg.Wait() wg.Wait()
peerIds := s.p2p.Peers().PeersToPrune() peerIds := s.p2p.Peers().PeersToPrune()
peerIds = s.filterNeededPeers(peerIds)
for _, id := range peerIds { for _, id := range peerIds {
if err := s.sendGoodByeAndDisconnect(s.ctx, p2ptypes.GoodbyeCodeTooManyPeers, id); err != nil { if err := s.sendGoodByeAndDisconnect(s.ctx, p2ptypes.GoodbyeCodeTooManyPeers, id); err != nil {
log.WithField("peer", id).WithError(err).Debug("Could not disconnect with peer") log.WithField("peer", id).WithError(err).Debug("Could not disconnect with peer")

View File

@@ -11,6 +11,7 @@ import (
"github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/proto"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub" pubsub "github.com/libp2p/go-libp2p-pubsub"
types "github.com/prysmaticlabs/eth2-types"
pb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" pb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/flags" "github.com/prysmaticlabs/prysm/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p" "github.com/prysmaticlabs/prysm/beacon-chain/p2p"
@@ -264,14 +265,7 @@ func (s *Service) subscribeDynamicWithSubnets(
if s.chainStarted.IsSet() && s.initialSync.Syncing() { if s.chainStarted.IsSet() && s.initialSync.Syncing() {
continue continue
} }
wantedSubs := s.retrievePersistentSubs(currentSlot)
// Persistent subscriptions from validators
persistentSubs := s.persistentSubnetIndices()
// Update desired topic indices for aggregator
wantedSubs := s.aggregatorSubnetIndices(currentSlot)
// Combine subscriptions to get all requested subscriptions
wantedSubs = sliceutil.SetUint64(append(persistentSubs, wantedSubs...))
// Resize as appropriate. // Resize as appropriate.
s.reValidateSubscriptions(subscriptions, wantedSubs, topicFormat, digest) s.reValidateSubscriptions(subscriptions, wantedSubs, topicFormat, digest)
@@ -358,6 +352,65 @@ func (s *Service) validPeersExist(subnetTopic string) bool {
return uint64(len(numOfPeers)) >= params.BeaconNetworkConfig().MinimumPeersInSubnet return uint64(len(numOfPeers)) >= params.BeaconNetworkConfig().MinimumPeersInSubnet
} }
func (s *Service) retrievePersistentSubs(currSlot types.Slot) []uint64 {
// Persistent subscriptions from validators
persistentSubs := s.persistentSubnetIndices()
// Update desired topic indices for aggregator
wantedSubs := s.aggregatorSubnetIndices(currSlot)
// Combine subscriptions to get all requested subscriptions
return sliceutil.SetUint64(append(persistentSubs, wantedSubs...))
}
// filters out required peers for the node to function, not
// pruning peers who are in our attestation subnets.
func (s *Service) filterNeededPeers(pids []peer.ID) []peer.ID {
// Exit early if nothing to filter.
if len(pids) == 0 {
return pids
}
digest, err := s.forkDigest()
if err != nil {
log.WithError(err).Error("Could not compute fork digest")
return pids
}
currSlot := s.chain.CurrentSlot()
wantedSubs := s.retrievePersistentSubs(currSlot)
wantedSubs = sliceutil.SetUint64(append(wantedSubs, s.attesterSubnetIndices(currSlot)...))
topic := p2p.GossipTypeMapping[reflect.TypeOf(&pb.Attestation{})]
// Map of peers in subnets
peerMap := make(map[peer.ID]bool)
for _, sub := range wantedSubs {
subnetTopic := fmt.Sprintf(topic, digest, sub) + s.p2p.Encoding().ProtocolSuffix()
peers := s.p2p.PubSub().ListPeers(subnetTopic)
if len(peers) > int(params.BeaconNetworkConfig().MinimumPeersInSubnet) {
// In the event we have more than the minimum, we can
// mark the remaining as viable for pruning.
peers = peers[:params.BeaconNetworkConfig().MinimumPeersInSubnet]
}
// Add peer to peer map.
for _, p := range peers {
// Even if the peer id has
// already been seen we still set
// it, as the outcome is the same.
peerMap[p] = true
}
}
// Clear out necessary peers from the peers to prune.
newPeers := make([]peer.ID, 0, len(pids))
for _, pid := range pids {
if peerMap[pid] {
continue
}
newPeers = append(newPeers, pid)
}
return newPeers
}
// Add fork digest to topic. // Add fork digest to topic.
func (s *Service) addDigestToTopic(topic string) string { func (s *Service) addDigestToTopic(topic string) string {
if !strings.Contains(topic, "%x") { if !strings.Contains(topic, "%x") {

View File

@@ -13,8 +13,10 @@ import (
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub" pubsub "github.com/libp2p/go-libp2p-pubsub"
pubsubpb "github.com/libp2p/go-libp2p-pubsub/pb" pubsubpb "github.com/libp2p/go-libp2p-pubsub/pb"
types "github.com/prysmaticlabs/eth2-types"
pb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" pb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
mockChain "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing" mockChain "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
db "github.com/prysmaticlabs/prysm/beacon-chain/db/testing" db "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/operations/slashings" "github.com/prysmaticlabs/prysm/beacon-chain/operations/slashings"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p" "github.com/prysmaticlabs/prysm/beacon-chain/p2p"
@@ -356,3 +358,77 @@ func Test_wrapAndReportValidation(t *testing.T) {
}) })
} }
} }
func TestFilterSubnetPeers(t *testing.T) {
p := p2ptest.NewTestP2P(t)
ctx, cancel := context.WithCancel(context.Background())
currSlot := types.Slot(100)
r := Service{
ctx: ctx,
chain: &mockChain.ChainService{
Genesis: time.Now(),
ValidatorsRoot: [32]byte{'A'},
Slot: &currSlot,
},
p2p: p,
chainStarted: abool.New(),
}
// Empty cache at the end of the test.
defer cache.SubnetIDs.EmptyAllCaches()
defaultTopic := "/eth2/%x/beacon_attestation_%d" + r.p2p.Encoding().ProtocolSuffix()
subnet10 := r.addDigestAndIndexToTopic(defaultTopic, 10)
cache.SubnetIDs.AddAggregatorSubnetID(currSlot, 10)
subnet20 := r.addDigestAndIndexToTopic(defaultTopic, 20)
cache.SubnetIDs.AddAttesterSubnetID(currSlot, 20)
p1 := createPeer(t, subnet10)
p2 := createPeer(t, subnet10, subnet20)
p3 := createPeer(t)
// Connect to all
// peers.
p.Connect(p1)
p.Connect(p2)
p.Connect(p3)
// Sleep a while to allow peers to connect.
time.Sleep(100 * time.Millisecond)
wantedPeers := []peer.ID{p1.PeerID(), p2.PeerID(), p3.PeerID()}
// Expect Peer 3 to be marked as suitable.
recPeers := r.filterNeededPeers(wantedPeers)
assert.DeepEqual(t, []peer.ID{p3.PeerID()}, recPeers)
// Try with only peers from subnet 20.
wantedPeers = []peer.ID{p2.BHost.ID()}
// Connect an excess amount of peers in the particular subnet.
for i := uint64(1); i <= params.BeaconNetworkConfig().MinimumPeersInSubnet; i++ {
nPeer := createPeer(t, subnet20)
p.Connect(nPeer)
wantedPeers = append(wantedPeers, nPeer.BHost.ID())
time.Sleep(100 * time.Millisecond)
}
recPeers = r.filterNeededPeers(wantedPeers)
assert.DeepEqual(t, 1, len(recPeers), "expected at least 1 suitable peer to prune")
cancel()
}
// Create peer and register them to provided topics.
func createPeer(t *testing.T, topics ...string) *p2ptest.TestP2P {
p := p2ptest.NewTestP2P(t)
for _, tp := range topics {
jTop, err := p.PubSub().Join(tp)
if err != nil {
t.Fatal(err)
}
_, err = jTop.Subscribe()
if err != nil {
t.Fatal(err)
}
}
return p
}