Move subnet topics to global vars (#6525)

* Deduplicate subnet topic definitions
* Move topics to global file
* Gaz
* Merge branch 'master' into move-topics
* Fix
* Merge refs/heads/master into move-topics
* Bazel
* Merge branch 'move-topics' of github.com:prysmaticlabs/prysm into move-topics
* Fix tests
* Fix
* Undo e2e changes
* Revert "Undo e2e changes"

This reverts commit 3037bb3590.
* Fix
* Fix
* Merge refs/heads/master into move-topics
* Merge refs/heads/master into move-topics
* Merge refs/heads/master into move-topics
* Comments
* Merge refs/heads/master into move-topics
* Merge refs/heads/master into move-topics
This commit is contained in:
Ivan Martinez
2020-07-09 13:38:15 -04:00
committed by GitHub
parent 47cbfbf437
commit 322998f7f1
14 changed files with 96 additions and 41 deletions

View File

@@ -24,12 +24,14 @@ go_library(
"sender.go",
"service.go",
"subnets.go",
"topics.go",
"utils.go",
"watch_peers.go",
],
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/p2p",
visibility = [
"//beacon-chain:__subpackages__",
"//endtoend/evaluators:__pkg__",
"//fuzz:__pkg__",
"//tools:__subpackages__",
],

View File

@@ -18,8 +18,6 @@ import (
// GossipTypeMapping.
var ErrMessageNotMapped = errors.New("message type is not mapped to a PubSub topic")
const attestationSubnetTopicFormat = "/eth2/%x/beacon_attestation_%d"
// Broadcast a message to the p2p network.
func (s *Service) Broadcast(ctx context.Context, msg proto.Message) error {
ctx, span := trace.StartSpan(ctx, "p2p.Broadcast")
@@ -81,5 +79,5 @@ func (s *Service) broadcastObject(ctx context.Context, obj interface{}, topic st
}
func attestationToTopic(subnet uint64, forkDigest [4]byte) string {
return fmt.Sprintf(attestationSubnetTopicFormat, forkDigest, subnet)
return fmt.Sprintf(AttestationSubnetTopicFormat, forkDigest, subnet)
}

View File

@@ -96,8 +96,8 @@ func TestService_Broadcast_ReturnsErr_TopicNotMapped(t *testing.T) {
}
func TestService_Attestation_Subnet(t *testing.T) {
if gtm := GossipTypeMapping[reflect.TypeOf(&eth.Attestation{})]; gtm != attestationSubnetTopicFormat {
t.Errorf("Constant is out of date. Wanted %s, got %s", attestationSubnetTopicFormat, gtm)
if gtm := GossipTypeMapping[reflect.TypeOf(&eth.Attestation{})]; gtm != AttestationSubnetTopicFormat {
t.Errorf("Constant is out of date. Wanted %s, got %s", AttestationSubnetTopicFormat, gtm)
}
tests := []struct {

View File

@@ -31,7 +31,7 @@ type Listener interface {
func (s *Service) createListener(
ipAddr net.IP,
privKey *ecdsa.PrivateKey,
) *discover.UDPv5 {
) (*discover.UDPv5, error) {
udpAddr := &net.UDPAddr{
IP: ipAddr,
Port: int(s.cfg.UDPPort),
@@ -45,7 +45,7 @@ func (s *Service) createListener(
}
conn, err := net.ListenUDP(networkVersion, udpAddr)
if err != nil {
log.Fatal(err)
return nil, errors.Wrap(err, "could not listen to UDP")
}
localNode, err := s.createLocalNode(
privKey,
@@ -54,7 +54,7 @@ func (s *Service) createListener(
int(s.cfg.TCPPort),
)
if err != nil {
log.Fatal(err)
return nil, errors.Wrap(err, "could not create local node")
}
if s.cfg.HostAddress != "" {
hostIP := net.ParseIP(s.cfg.HostAddress)
@@ -72,16 +72,16 @@ func (s *Service) createListener(
for _, addr := range s.cfg.Discv5BootStrapAddr {
bootNode, err := enode.Parse(enode.ValidSchemes, addr)
if err != nil {
log.Fatal(err)
return nil, errors.Wrap(err, "could not bootstrap addr")
}
dv5Cfg.Bootnodes = append(dv5Cfg.Bootnodes, bootNode)
}
network, err := discover.ListenV5(conn, localNode, dv5Cfg)
if err != nil {
log.Fatal(err)
return nil, errors.Wrap(err, "could not listen to discV5")
}
return network
return network, nil
}
func (s *Service) createLocalNode(
@@ -115,7 +115,10 @@ func (s *Service) startDiscoveryV5(
addr net.IP,
privKey *ecdsa.PrivateKey,
) (*discover.UDPv5, error) {
listener := s.createListener(addr, privKey)
listener, err := s.createListener(addr, privKey)
if err != nil {
return nil, errors.Wrap(err, "could not create listener")
}
record := listener.Self()
log.WithField("ENR", record.String()).Info("Started discovery v5")
return listener, nil

View File

@@ -56,7 +56,10 @@ func TestCreateListener(t *testing.T) {
genesisValidatorsRoot: []byte{'A'},
cfg: &Config{UDPPort: uint(port)},
}
listener := s.createListener(ipAddr, pkey)
listener, err := s.createListener(ipAddr, pkey)
if err != nil {
t.Fatal(err)
}
defer listener.Close()
if !listener.Self().IP().Equal(ipAddr) {
@@ -85,7 +88,10 @@ func TestStartDiscV5_DiscoverAllPeers(t *testing.T) {
genesisTime: genesisTime,
genesisValidatorsRoot: genesisValidatorsRoot,
}
bootListener := s.createListener(ipAddr, pkey)
bootListener, err := s.createListener(ipAddr, pkey)
if err != nil {
t.Fatal(err)
}
defer bootListener.Close()
bootNode := bootListener.Self()
@@ -155,7 +161,10 @@ func TestMultiAddrConversion_OK(t *testing.T) {
genesisTime: time.Now(),
genesisValidatorsRoot: []byte{'A'},
}
listener := s.createListener(ipAddr, pkey)
listener, err := s.createListener(ipAddr, pkey)
if err != nil {
t.Fatal(err)
}
defer listener.Close()
_ = convertToMultiAddr([]*enode.Node{listener.Self()})

View File

@@ -32,7 +32,10 @@ func TestStartDiscv5_DifferentForkDigests(t *testing.T) {
genesisTime: genesisTime,
genesisValidatorsRoot: genesisValidatorsRoot,
}
bootListener := s.createListener(ipAddr, pkey)
bootListener, err := s.createListener(ipAddr, pkey)
if err != nil {
t.Fatal(err)
}
defer bootListener.Close()
bootNode := bootListener.Self()
@@ -85,7 +88,7 @@ func TestStartDiscv5_DifferentForkDigests(t *testing.T) {
cfg.UDPPort = 14000
cfg.TCPPort = 14001
cfg.MaxPeers = 30
s, err := NewService(cfg)
s, err = NewService(cfg)
if err != nil {
t.Fatal(err)
}
@@ -125,7 +128,10 @@ func TestStartDiscv5_SameForkDigests_DifferentNextForkData(t *testing.T) {
genesisTime: genesisTime,
genesisValidatorsRoot: genesisValidatorsRoot,
}
bootListener := s.createListener(ipAddr, pkey)
bootListener, err := s.createListener(ipAddr, pkey)
if err != nil {
t.Fatal(err)
}
defer bootListener.Close()
bootNode := bootListener.Self()
@@ -182,7 +188,7 @@ func TestStartDiscv5_SameForkDigests_DifferentNextForkData(t *testing.T) {
cfg.UDPPort = 14000
cfg.TCPPort = 14001
cfg.MaxPeers = 30
s, err := NewService(cfg)
s, err = NewService(cfg)
if err != nil {
t.Fatal(err)
}

View File

@@ -10,12 +10,12 @@ import (
// GossipTopicMappings represent the protocol ID to protobuf message type map for easy
// lookup.
var GossipTopicMappings = map[string]proto.Message{
"/eth2/%x/beacon_block": &pb.SignedBeaconBlock{},
"/eth2/%x/beacon_attestation_%d": &pb.Attestation{},
"/eth2/%x/voluntary_exit": &pb.SignedVoluntaryExit{},
"/eth2/%x/proposer_slashing": &pb.ProposerSlashing{},
"/eth2/%x/attester_slashing": &pb.AttesterSlashing{},
"/eth2/%x/beacon_aggregate_and_proof": &pb.SignedAggregateAttestationAndProof{},
BlockSubnetTopicFormat: &pb.SignedBeaconBlock{},
AttestationSubnetTopicFormat: &pb.Attestation{},
ExitSubnetTopicFormat: &pb.SignedVoluntaryExit{},
ProposerSlashingSubnetTopicFormat: &pb.ProposerSlashing{},
AttesterSlashingSubnetTopicFormat: &pb.AttesterSlashing{},
AggregateAndProofSubnetTopicFormat: &pb.SignedAggregateAttestationAndProof{},
}
// GossipTypeMapping is the inverse of GossipTopicMappings so that an arbitrary protobuf message

View File

@@ -181,7 +181,7 @@ func (s *Service) Start() {
s.privKey,
)
if err != nil {
log.WithError(err).Error("Failed to start discovery")
log.WithError(err).Fatal("Failed to start discovery")
s.startupErr = err
return
}

View File

@@ -161,7 +161,10 @@ func TestListenForNewNodes(t *testing.T) {
genesisTime: genesisTime,
genesisValidatorsRoot: genesisValidatorsRoot,
}
bootListener := s.createListener(ipAddr, pkey)
bootListener, err := s.createListener(ipAddr, pkey)
if err != nil {
t.Fatal(err)
}
defer bootListener.Close()
// Use shorter period for testing.
@@ -216,7 +219,7 @@ func TestListenForNewNodes(t *testing.T) {
cfg.UDPPort = 14000
cfg.TCPPort = 14001
s, err := NewService(cfg)
s, err = NewService(cfg)
if err != nil {
t.Fatal(err)
}

View File

@@ -24,7 +24,10 @@ func TestStartDiscV5_DiscoverPeersWithSubnets(t *testing.T) {
genesisTime: genesisTime,
genesisValidatorsRoot: genesisValidatorsRoot,
}
bootListener := s.createListener(ipAddr, pkey)
bootListener, err := s.createListener(ipAddr, pkey)
if err != nil {
t.Fatal(err)
}
defer bootListener.Close()
bootNode := bootListener.Self()
@@ -77,7 +80,7 @@ func TestStartDiscV5_DiscoverPeersWithSubnets(t *testing.T) {
UDPPort: uint(port),
}
cfg.StateNotifier = &mock.MockStateNotifier{}
s, err := NewService(cfg)
s, err = NewService(cfg)
if err != nil {
t.Fatal(err)
}

View File

@@ -0,0 +1,16 @@
package p2p
const (
// AttestationSubnetTopicFormat is the topic format for the attestation subnet.
AttestationSubnetTopicFormat = "/eth2/%x/beacon_attestation_%d"
// BlockSubnetTopicFormat is the topic format for the block subnet.
BlockSubnetTopicFormat = "/eth2/%x/beacon_block"
// ExitSubnetTopicFormat is the topic format for the voluntary exit subnet.
ExitSubnetTopicFormat = "/eth2/%x/voluntary_exit"
// ProposerSlashingSubnetTopicFormat is the topic format for the proposer slashing subnet.
ProposerSlashingSubnetTopicFormat = "/eth2/%x/proposer_slashing"
// AttesterSlashingSubnetTopicFormat is the topic format for the attester slashing subnet.
AttesterSlashingSubnetTopicFormat = "/eth2/%x/attester_slashing"
// AggregateAndProofSubnetTopicFormat is the topic format for the aggregate and proof subnet.
AggregateAndProofSubnetTopicFormat = "/eth2/%x/beacon_aggregate_and_proof"
)

View File

@@ -48,27 +48,27 @@ func (s *Service) noopValidator(ctx context.Context, _ peer.ID, msg *pubsub.Mess
// Register PubSub subscribers
func (s *Service) registerSubscribers() {
s.subscribe(
"/eth2/%x/beacon_block",
p2p.BlockSubnetTopicFormat,
s.validateBeaconBlockPubSub,
s.beaconBlockSubscriber,
)
s.subscribe(
"/eth2/%x/beacon_aggregate_and_proof",
p2p.AggregateAndProofSubnetTopicFormat,
s.validateAggregateAndProof,
s.beaconAggregateProofSubscriber,
)
s.subscribe(
"/eth2/%x/voluntary_exit",
p2p.ExitSubnetTopicFormat,
s.validateVoluntaryExit,
s.voluntaryExitSubscriber,
)
s.subscribe(
"/eth2/%x/proposer_slashing",
p2p.ProposerSlashingSubnetTopicFormat,
s.validateProposerSlashing,
s.proposerSlashingSubscriber,
)
s.subscribe(
"/eth2/%x/attester_slashing",
p2p.AttesterSlashingSubnetTopicFormat,
s.validateAttesterSlashing,
s.attesterSlashingSubscriber,
)

View File

@@ -16,6 +16,7 @@ go_library(
visibility = ["//endtoend:__subpackages__"],
deps = [
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/p2p:go_default_library",
"//endtoend/params:go_default_library",
"//endtoend/types:go_default_library",
"//shared/bytesutil:go_default_library",

View File

@@ -12,6 +12,7 @@ import (
ptypes "github.com/gogo/protobuf/types"
"github.com/pkg/errors"
eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
e2e "github.com/prysmaticlabs/prysm/endtoend/params"
"github.com/prysmaticlabs/prysm/endtoend/types"
"github.com/prysmaticlabs/prysm/shared/p2putils"
@@ -49,23 +50,28 @@ var metricLessThanTests = []equalityTest{
},
}
const (
p2pFailValidationTopic = "p2p_message_failed_validation_total{topic=\"%s/ssz_snappy\"}"
p2pReceivedTotalTopic = "p2p_message_received_total{topic=\"%s/ssz_snappy\"}"
)
var metricComparisonTests = []comparisonTest{
{
name: "beacon aggregate and proof",
topic1: "p2p_message_failed_validation_total{topic=\"/eth2/%x/beacon_aggregate_and_proof/ssz_snappy\"}",
topic2: "p2p_message_received_total{topic=\"/eth2/%x/beacon_aggregate_and_proof/ssz_snappy\"}",
topic1: fmt.Sprintf(p2pFailValidationTopic, p2p.AggregateAndProofSubnetTopicFormat),
topic2: fmt.Sprintf(p2pReceivedTotalTopic, p2p.AggregateAndProofSubnetTopicFormat),
expectedComparison: 0.8,
},
{
name: "committee index 0 beacon attestation",
topic1: "p2p_message_failed_validation_total{topic=\"/eth2/%x/beacon_attestation_0/ssz_snappy\"}",
topic2: "p2p_message_received_total{topic=\"/eth2/%x/beacon_attestation_0/ssz_snappy\"}",
topic1: fmt.Sprintf(p2pFailValidationTopic, fmt.Sprintf(formatTopic(p2p.AttestationSubnetTopicFormat), 0)),
topic2: fmt.Sprintf(p2pReceivedTotalTopic, fmt.Sprintf(formatTopic(p2p.AttestationSubnetTopicFormat), 0)),
expectedComparison: 0.15,
},
{
name: "committee index 1 beacon attestation",
topic1: "p2p_message_failed_validation_total{topic=\"/eth2/%x/beacon_attestation_1/ssz_snappy\"}",
topic2: "p2p_message_received_total{topic=\"/eth2/%x/beacon_attestation_1/ssz_snappy\"}",
topic1: fmt.Sprintf(p2pFailValidationTopic, fmt.Sprintf(formatTopic(p2p.AttestationSubnetTopicFormat), 1)),
topic2: fmt.Sprintf(p2pReceivedTotalTopic, fmt.Sprintf(formatTopic(p2p.AttestationSubnetTopicFormat), 1)),
expectedComparison: 0.15,
},
{
@@ -207,3 +213,11 @@ func getValueOfTopic(pageContent string, topic string) (int, error) {
}
return int(floatResult), nil
}
func formatTopic(topic string) string {
startIndex := strings.Index(topic, "%x")
if startIndex == -1 {
return topic
}
return topic[:startIndex] + "%" + topic[startIndex:]
}