Clean Up P2P Service (#6574)

* clean up
* clean up
* gaz
* preston's review
* preston's review
* Merge refs/heads/master into CleanUpP2P
This commit is contained in:
Nishant Das
2020-07-13 12:16:24 +08:00
committed by GitHub
parent 62df4995e6
commit b5bd1260d0
7 changed files with 246 additions and 236 deletions

View File

@@ -19,7 +19,7 @@ go_library(
"log.go",
"monitoring.go",
"options.go",
"pubsub_message_id.go",
"pubsub.go",
"rpc_topic_mappings.go",
"sender.go",
"service.go",

View File

@@ -1,9 +1,11 @@
package p2p
import (
"bytes"
"crypto/ecdsa"
"fmt"
"net"
"time"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/enode"
@@ -13,6 +15,8 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"
"github.com/pkg/errors"
"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
)
// Listener defines the discovery V5 network interface that is used
@@ -28,6 +32,68 @@ type Listener interface {
LocalNode() *enode.LocalNode
}
// RefreshENR uses an epoch to refresh the enr entry for our node
// with the tracked committee ids for the epoch, allowing our node
// to be dynamically discoverable by others given our tracked committee ids.
func (s *Service) RefreshENR() {
// return early if discv5 isnt running
if s.dv5Listener == nil {
return
}
bitV := bitfield.NewBitvector64()
committees := cache.SubnetIDs.GetAllSubnets()
for _, idx := range committees {
bitV.SetBitAt(idx, true)
}
currentBitV, err := retrieveBitvector(s.dv5Listener.Self().Record())
if err != nil {
log.Errorf("Could not retrieve bitfield: %v", err)
return
}
if bytes.Equal(bitV, currentBitV) {
// return early if bitfield hasn't changed
return
}
s.updateSubnetRecordWithMetadata(bitV)
// ping all peers to inform them of new metadata
s.pingPeers()
}
// listen for new nodes watches for new nodes in the network and adds them to the peerstore.
func (s *Service) listenForNewNodes() {
iterator := s.dv5Listener.RandomNodes()
iterator = enode.Filter(iterator, s.filterPeer)
defer iterator.Close()
for {
// Exit if service's context is canceled
if s.ctx.Err() != nil {
break
}
if s.isPeerAtLimit() {
// Pause the main loop for a period to stop looking
// for new peers.
log.Trace("Not looking for peers, at peer limit")
time.Sleep(pollingPeriod)
continue
}
exists := iterator.Next()
if !exists {
break
}
node := iterator.Node()
peerInfo, _, err := convertToAddrInfo(node)
if err != nil {
log.WithError(err).Error("Could not convert to peer info")
continue
}
go func(info *peer.AddrInfo) {
if err := s.connectWithPeer(*info); err != nil {
log.WithError(err).Tracef("Could not connect with peer %s", info.String())
}
}(peerInfo)
}
}
func (s *Service) createListener(
ipAddr net.IP,
privKey *ecdsa.PrivateKey,

View File

@@ -1,5 +1,53 @@
package p2p
import "github.com/sirupsen/logrus"
import (
"strconv"
"strings"
"github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"
"github.com/sirupsen/logrus"
)
var log = logrus.WithField("prefix", "p2p")
func logIPAddr(id peer.ID, addrs ...ma.Multiaddr) {
var correctAddr ma.Multiaddr
for _, addr := range addrs {
if strings.Contains(addr.String(), "/ip4/") || strings.Contains(addr.String(), "/ip6/") {
correctAddr = addr
break
}
}
if correctAddr != nil {
log.WithField(
"multiAddr",
correctAddr.String()+"/p2p/"+id.String(),
).Info("Node started p2p server")
}
}
func logExternalIPAddr(id peer.ID, addr string, port uint) {
if addr != "" {
multiAddr, err := multiAddressBuilder(addr, port)
if err != nil {
log.Errorf("Could not create multiaddress: %v", err)
return
}
log.WithField(
"multiAddr",
multiAddr.String()+"/p2p/"+id.String(),
).Info("Node started external p2p server")
}
}
func logExternalDNSAddr(id peer.ID, addr string, port uint) {
if addr != "" {
p := strconv.FormatUint(uint64(port), 10)
log.WithField(
"multiAddr",
"/dns4/"+addr+"/tcp/"+p+"/p2p/"+id.String(),
).Info("Node started external p2p server")
}
}

View File

@@ -0,0 +1,62 @@
package p2p
import (
"context"
"encoding/base64"
pubsub "github.com/libp2p/go-libp2p-pubsub"
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/prysmaticlabs/prysm/shared/hashutil"
)
// JoinTopic will join PubSub topic, if not already joined.
func (s *Service) JoinTopic(topic string, opts ...pubsub.TopicOpt) (*pubsub.Topic, error) {
if _, ok := s.joinedTopics[topic]; !ok {
topicHandle, err := s.pubsub.Join(topic, opts...)
if err != nil {
return nil, err
}
s.joinedTopics[topic] = topicHandle
}
return s.joinedTopics[topic], nil
}
// LeaveTopic closes topic and removes corresponding handler from list of joined topics.
// This method will return error if there are outstanding event handlers or subscriptions.
func (s *Service) LeaveTopic(topic string) error {
if t, ok := s.joinedTopics[topic]; ok {
if err := t.Close(); err != nil {
return err
}
delete(s.joinedTopics, topic)
}
return nil
}
// PublishToTopic joins (if necessary) and publishes a message to a PubSub topic.
func (s *Service) PublishToTopic(ctx context.Context, topic string, data []byte, opts ...pubsub.PubOpt) error {
topicHandle, err := s.JoinTopic(topic)
if err != nil {
return err
}
return topicHandle.Publish(ctx, data, opts...)
}
// SubscribeToTopic joins (if necessary) and subscribes to PubSub topic.
func (s *Service) SubscribeToTopic(topic string, opts ...pubsub.SubOpt) (*pubsub.Subscription, error) {
topicHandle, err := s.JoinTopic(topic)
if err != nil {
return nil, err
}
return topicHandle.Subscribe(opts...)
}
// Content addressable ID function.
//
// ETH2 spec defines the message ID as:
// message-id: base64(SHA256(message.data))
func msgIDFunction(pmsg *pubsub_pb.Message) string {
h := hashutil.FastSum256(pmsg.Data)
return base64.URLEncoding.EncodeToString(h[:])
}

View File

@@ -1,17 +0,0 @@
package p2p
import (
"encoding/base64"
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/prysmaticlabs/prysm/shared/hashutil"
)
// Content addressable ID function.
//
// Loosely defined as Base64(sha2(data)) until a formal specification is determined.
// Pending: https://github.com/ethereum/eth2.0-specs/issues/1528
func msgIDFunction(pmsg *pubsub_pb.Message) string {
h := hashutil.FastSum256(pmsg.Data)
return base64.URLEncoding.EncodeToString(h[:])
}

View File

@@ -4,11 +4,8 @@
package p2p
import (
"bytes"
"context"
"crypto/ecdsa"
"strconv"
"strings"
"time"
"github.com/dgraph-io/ristretto"
@@ -24,8 +21,6 @@ import (
filter "github.com/multiformats/go-multiaddr"
ma "github.com/multiformats/go-multiaddr"
"github.com/pkg/errors"
"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/encoder"
@@ -289,49 +284,6 @@ func (s *Service) SetStreamHandler(topic string, handler network.StreamHandler)
s.host.SetStreamHandler(protocol.ID(topic), handler)
}
// JoinTopic will join PubSub topic, if not already joined.
func (s *Service) JoinTopic(topic string, opts ...pubsub.TopicOpt) (*pubsub.Topic, error) {
if _, ok := s.joinedTopics[topic]; !ok {
topicHandle, err := s.pubsub.Join(topic, opts...)
if err != nil {
return nil, err
}
s.joinedTopics[topic] = topicHandle
}
return s.joinedTopics[topic], nil
}
// LeaveTopic closes topic and removes corresponding handler from list of joined topics.
// This method will return error if there are outstanding event handlers or subscriptions.
func (s *Service) LeaveTopic(topic string) error {
if t, ok := s.joinedTopics[topic]; ok {
if err := t.Close(); err != nil {
return err
}
delete(s.joinedTopics, topic)
}
return nil
}
// PublishToTopic joins (if necessary) and publishes a message to a PubSub topic.
func (s *Service) PublishToTopic(ctx context.Context, topic string, data []byte, opts ...pubsub.PubOpt) error {
topicHandle, err := s.JoinTopic(topic)
if err != nil {
return err
}
return topicHandle.Publish(ctx, data, opts...)
}
// SubscribeToTopic joins (if necessary) and subscribes to PubSub topic.
func (s *Service) SubscribeToTopic(topic string, opts ...pubsub.SubOpt) (*pubsub.Subscription, error) {
topicHandle, err := s.JoinTopic(topic)
if err != nil {
return nil, err
}
return topicHandle.Subscribe(opts...)
}
// PeerID returns the Peer ID of the local peer.
func (s *Service) PeerID() peer.ID {
return s.host.ID()
@@ -370,86 +322,6 @@ func (s *Service) MetadataSeq() uint64 {
return s.metaData.SeqNumber
}
// RefreshENR uses an epoch to refresh the enr entry for our node
// with the tracked committee ids for the epoch, allowing our node
// to be dynamically discoverable by others given our tracked committee ids.
func (s *Service) RefreshENR() {
// return early if discv5 isnt running
if s.dv5Listener == nil {
return
}
bitV := bitfield.NewBitvector64()
committees := cache.SubnetIDs.GetAllSubnets()
for _, idx := range committees {
bitV.SetBitAt(idx, true)
}
currentBitV, err := retrieveBitvector(s.dv5Listener.Self().Record())
if err != nil {
log.Errorf("Could not retrieve bitfield: %v", err)
return
}
if bytes.Equal(bitV, currentBitV) {
// return early if bitfield hasn't changed
return
}
s.updateSubnetRecordWithMetadata(bitV)
// ping all peers to inform them of new metadata
s.pingPeers()
}
// FindPeersWithSubnet performs a network search for peers
// subscribed to a particular subnet. Then we try to connect
// with those peers.
func (s *Service) FindPeersWithSubnet(index uint64) (bool, error) {
if s.dv5Listener == nil {
// return if discovery isn't set
return false, nil
}
iterator := s.dv5Listener.RandomNodes()
nodes := enode.ReadNodes(iterator, lookupLimit)
exists := false
for _, node := range nodes {
if node.IP() == nil {
continue
}
// do not look for nodes with no tcp port set
if err := node.Record().Load(enr.WithEntry("tcp", new(enr.TCP))); err != nil {
if !enr.IsNotFound(err) {
log.WithError(err).Debug("Could not retrieve tcp port")
}
continue
}
subnets, err := retrieveAttSubnets(node.Record())
if err != nil {
log.Debugf("could not retrieve subnets: %v", err)
continue
}
for _, comIdx := range subnets {
if comIdx == index {
info, multiAddr, err := convertToAddrInfo(node)
if err != nil {
return false, err
}
if s.peers.IsActive(info.ID) {
exists = true
continue
}
if s.host.Network().Connectedness(info.ID) == network.Connected {
exists = true
continue
}
s.peers.Add(node.Record(), info.ID, multiAddr, network.DirUnknown)
if err := s.connectWithPeer(*info); err != nil {
log.WithError(err).Tracef("Could not connect with peer %s", info.String())
continue
}
exists = true
}
}
}
return exists, nil
}
// AddPingMethod adds the metadata ping rpc method to the p2p service, so that it can
// be used to refresh ENR.
func (s *Service) AddPingMethod(reqFunc func(ctx context.Context, id peer.ID) error) {
@@ -492,41 +364,6 @@ func (s *Service) awaitStateInitialized() {
}
}
// listen for new nodes watches for new nodes in the network and adds them to the peerstore.
func (s *Service) listenForNewNodes() {
iterator := s.dv5Listener.RandomNodes()
iterator = enode.Filter(iterator, s.filterPeer)
defer iterator.Close()
for {
// Exit if service's context is canceled
if s.ctx.Err() != nil {
break
}
if s.isPeerAtLimit() {
// Pause the main loop for a period to stop looking
// for new peers.
log.Trace("Not looking for peers, at peer limit")
time.Sleep(pollingPeriod)
continue
}
exists := iterator.Next()
if !exists {
break
}
node := iterator.Node()
peerInfo, _, err := convertToAddrInfo(node)
if err != nil {
log.WithError(err).Error("Could not convert to peer info")
continue
}
go func(info *peer.AddrInfo) {
if err := s.connectWithPeer(*info); err != nil {
log.WithError(err).Tracef("Could not connect with peer %s", info.String())
}
}(peerInfo)
}
}
func (s *Service) connectWithAllPeers(multiAddrs []ma.Multiaddr) {
addrInfos, err := peer.AddrInfosFromP2pAddrs(multiAddrs...)
if err != nil {
@@ -579,57 +416,3 @@ func (s *Service) connectToBootnodes() error {
s.connectWithAllPeers(multiAddresses)
return nil
}
// Updates the service's discv5 listener record's attestation subnet
// with a new value for a bitfield of subnets tracked. It also updates
// the node's metadata by increasing the sequence number and the
// subnets tracked by the node.
func (s *Service) updateSubnetRecordWithMetadata(bitV bitfield.Bitvector64) {
entry := enr.WithEntry(attSubnetEnrKey, &bitV)
s.dv5Listener.LocalNode().Set(entry)
s.metaData = &pb.MetaData{
SeqNumber: s.metaData.SeqNumber + 1,
Attnets: bitV,
}
}
func logIPAddr(id peer.ID, addrs ...ma.Multiaddr) {
var correctAddr ma.Multiaddr
for _, addr := range addrs {
if strings.Contains(addr.String(), "/ip4/") || strings.Contains(addr.String(), "/ip6/") {
correctAddr = addr
break
}
}
if correctAddr != nil {
log.WithField(
"multiAddr",
correctAddr.String()+"/p2p/"+id.String(),
).Info("Node started p2p server")
}
}
func logExternalIPAddr(id peer.ID, addr string, port uint) {
if addr != "" {
multiAddr, err := multiAddressBuilder(addr, port)
if err != nil {
log.Errorf("Could not create multiaddress: %v", err)
return
}
log.WithField(
"multiAddr",
multiAddr.String()+"/p2p/"+id.String(),
).Info("Node started external p2p server")
}
}
func logExternalDNSAddr(id peer.ID, addr string, port uint) {
if addr != "" {
p := strconv.FormatUint(uint64(port), 10)
log.WithField(
"multiAddr",
"/dns4/"+addr+"/tcp/"+p+"/p2p/"+id.String(),
).Info("Node started external p2p server")
}
}

View File

@@ -3,7 +3,9 @@ package p2p
import (
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/libp2p/go-libp2p-core/network"
"github.com/prysmaticlabs/go-bitfield"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/params"
)
@@ -11,6 +13,72 @@ var attestationSubnetCount = params.BeaconNetworkConfig().AttestationSubnetCount
var attSubnetEnrKey = params.BeaconNetworkConfig().AttSubnetKey
// FindPeersWithSubnet performs a network search for peers
// subscribed to a particular subnet. Then we try to connect
// with those peers.
func (s *Service) FindPeersWithSubnet(index uint64) (bool, error) {
if s.dv5Listener == nil {
// return if discovery isn't set
return false, nil
}
iterator := s.dv5Listener.RandomNodes()
nodes := enode.ReadNodes(iterator, lookupLimit)
exists := false
for _, node := range nodes {
if node.IP() == nil {
continue
}
// do not look for nodes with no tcp port set
if err := node.Record().Load(enr.WithEntry("tcp", new(enr.TCP))); err != nil {
if !enr.IsNotFound(err) {
log.WithError(err).Debug("Could not retrieve tcp port")
}
continue
}
subnets, err := retrieveAttSubnets(node.Record())
if err != nil {
log.Debugf("could not retrieve subnets: %v", err)
continue
}
for _, comIdx := range subnets {
if comIdx == index {
info, multiAddr, err := convertToAddrInfo(node)
if err != nil {
return false, err
}
if s.peers.IsActive(info.ID) {
exists = true
continue
}
if s.host.Network().Connectedness(info.ID) == network.Connected {
exists = true
continue
}
s.peers.Add(node.Record(), info.ID, multiAddr, network.DirUnknown)
if err := s.connectWithPeer(*info); err != nil {
log.WithError(err).Tracef("Could not connect with peer %s", info.String())
continue
}
exists = true
}
}
}
return exists, nil
}
// Updates the service's discv5 listener record's attestation subnet
// with a new value for a bitfield of subnets tracked. It also updates
// the node's metadata by increasing the sequence number and the
// subnets tracked by the node.
func (s *Service) updateSubnetRecordWithMetadata(bitV bitfield.Bitvector64) {
entry := enr.WithEntry(attSubnetEnrKey, &bitV)
s.dv5Listener.LocalNode().Set(entry)
s.metaData = &pb.MetaData{
SeqNumber: s.metaData.SeqNumber + 1,
Attnets: bitV,
}
}
// Initializes a bitvector of attestation subnets beacon nodes is subscribed to
// and creates a new ENR entry with its default value.
func intializeAttSubnets(node *enode.LocalNode) *enode.LocalNode {