Add in P2P Changes (#9390)

* add in initial changes

* add test method

* raul's review

* Update beacon-chain/p2p/gossip_scoring_params.go

Co-authored-by: Preston Van Loon <preston@prysmaticlabs.com>

* Update beacon-chain/p2p/gossip_scoring_params.go

Co-authored-by: Preston Van Loon <preston@prysmaticlabs.com>

* preston's review

* kasey's review

* only 1

Co-authored-by: Raul Jordan <raul@prysmaticlabs.com>
Co-authored-by: Preston Van Loon <preston@prysmaticlabs.com>
This commit is contained in:
Nishant Das
2021-08-27 09:34:20 +08:00
committed by GitHub
parent 031830baa4
commit 4d02329cd5
25 changed files with 748 additions and 79 deletions

View File

@@ -62,6 +62,11 @@ func (mb *mockBroadcaster) BroadcastAttestation(_ context.Context, _ uint64, _ *
return nil
}
func (mb *mockBroadcaster) BroadcastSyncCommitteeMessage(_ context.Context, _ uint64, _ *ethpb.SyncCommitteeMessage) error {
mb.broadcastCalled = true
return nil
}
var _ p2p.Broadcaster = (*mockBroadcaster)(nil)
func setupBeaconChain(t *testing.T, beaconDB db.Database) *Service {

View File

@@ -11,6 +11,7 @@ go_library(
"discovery.go",
"doc.go",
"fork.go",
"fork_watcher.go",
"gossip_scoring_params.go",
"gossip_topic_mappings.go",
"handshake.go",
@@ -39,6 +40,7 @@ go_library(
],
deps = [
"//beacon-chain/cache:go_default_library",
"//beacon-chain/core/altair:go_default_library",
"//beacon-chain/core/feed:go_default_library",
"//beacon-chain/core/feed/state:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
@@ -69,8 +71,10 @@ go_library(
"@com_github_ethereum_go_ethereum//p2p/discover:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enode:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enr:go_default_library",
"@com_github_ferranbt_fastssz//:go_default_library",
"@com_github_ipfs_go_ipfs_addr//:go_default_library",
"@com_github_kevinms_leakybucket_go//:go_default_library",
"@com_github_kr_pretty//:go_default_library",
"@com_github_libp2p_go_libp2p//:go_default_library",
"@com_github_libp2p_go_libp2p//config:go_default_library",
"@com_github_libp2p_go_libp2p//p2p/protocol/identify:go_default_library",
@@ -148,6 +152,7 @@ go_test(
"//shared/testutil/assert:go_default_library",
"//shared/testutil/require:go_default_library",
"//shared/timeutils:go_default_library",
"//shared/version:go_default_library",
"@com_github_ethereum_go_ethereum//crypto:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/discover:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enode:go_default_library",

View File

@@ -7,8 +7,12 @@ import (
"reflect"
"time"
ssz "github.com/ferranbt/fastssz"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/beacon-chain/core/altair"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
eth "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/hashutil"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/traceutil"
@@ -20,7 +24,8 @@ import (
// GossipTypeMapping.
var ErrMessageNotMapped = errors.New("message type is not mapped to a PubSub topic")
// Broadcast a message to the p2p network.
// Broadcasts a message to the p2p network, the message is assumed to be
// broadcasted to the current fork.
func (s *Service) Broadcast(ctx context.Context, msg proto.Message) error {
ctx, span := trace.StartSpan(ctx, "p2p.Broadcast")
defer span.End()
@@ -41,10 +46,15 @@ func (s *Service) Broadcast(ctx context.Context, msg proto.Message) error {
traceutil.AnnotateError(span, ErrMessageNotMapped)
return ErrMessageNotMapped
}
return s.broadcastObject(ctx, msg, fmt.Sprintf(topic, forkDigest))
castMsg, ok := msg.(ssz.Marshaler)
if !ok {
return errors.Errorf("message of %T does not support marshaller interface", msg)
}
return s.broadcastObject(ctx, castMsg, fmt.Sprintf(topic, forkDigest))
}
// BroadcastAttestation broadcasts an attestation to the p2p network.
// BroadcastAttestation broadcasts an attestation to the p2p network, the message is assumed to be
// broadcasted to the current fork.
func (s *Service) BroadcastAttestation(ctx context.Context, subnet uint64, att *eth.Attestation) error {
ctx, span := trace.StartSpan(ctx, "p2p.BroadcastAttestation")
defer span.End()
@@ -61,6 +71,24 @@ func (s *Service) BroadcastAttestation(ctx context.Context, subnet uint64, att *
return nil
}
// BroadcastSyncCommitteeMessage broadcasts a sync committee message to the p2p network, the message is assumed to be
// broadcasted to the current fork.
func (s *Service) BroadcastSyncCommitteeMessage(ctx context.Context, subnet uint64, sMsg *ethpb.SyncCommitteeMessage) error {
ctx, span := trace.StartSpan(ctx, "p2p.BroadcastSyncCommitteeMessage")
defer span.End()
forkDigest, err := s.currentForkDigest()
if err != nil {
err := errors.Wrap(err, "could not retrieve fork digest")
traceutil.AnnotateError(span, err)
return err
}
// Non-blocking broadcast, with attempts to discover a subnet peer if none available.
go s.broadcastSyncCommittee(ctx, subnet, sMsg, forkDigest)
return nil
}
func (s *Service) broadcastAttestation(ctx context.Context, subnet uint64, att *eth.Attestation, forkDigest [4]byte) {
ctx, span := trace.StartSpan(ctx, "p2p.broadcastAttestation")
defer span.End()
@@ -100,6 +128,13 @@ func (s *Service) broadcastAttestation(ctx context.Context, subnet uint64, att *
traceutil.AnnotateError(span, err)
}
}
// In the event our attestation is outdated and beyond the
// acceptable threshold, we exit early and do not broadcast it.
currSlot := helpers.CurrentSlot(uint64(s.genesisTime.Unix()))
if att.Data.Slot+params.BeaconConfig().SlotsPerEpoch < currSlot {
log.Warnf("Attestation is too old to broadcast, discarding it. Current Slot: %d , Attestation Slot: %d", currSlot, att.Data.Slot)
return
}
if err := s.broadcastObject(ctx, att, attestationToTopic(subnet, forkDigest)); err != nil {
log.WithError(err).Error("Failed to broadcast attestation")
@@ -107,8 +142,63 @@ func (s *Service) broadcastAttestation(ctx context.Context, subnet uint64, att *
}
}
func (s *Service) broadcastSyncCommittee(ctx context.Context, subnet uint64, sMsg *ethpb.SyncCommitteeMessage, forkDigest [4]byte) {
ctx, span := trace.StartSpan(ctx, "p2p.broadcastSyncCommittee")
defer span.End()
ctx = trace.NewContext(context.Background(), span) // clear parent context / deadline.
oneSlot := time.Duration(1*params.BeaconConfig().SecondsPerSlot) * time.Second
ctx, cancel := context.WithTimeout(ctx, oneSlot)
defer cancel()
// Ensure we have peers with this subnet.
// This adds in a special value to the subnet
// to ensure that we can re-use the same subnet locker.
wrappedSubIdx := subnet + syncLockerVal
s.subnetLocker(wrappedSubIdx).RLock()
hasPeer := s.hasPeerWithSubnet(syncCommitteeToTopic(subnet, forkDigest))
s.subnetLocker(wrappedSubIdx).RUnlock()
span.AddAttributes(
trace.BoolAttribute("hasPeer", hasPeer),
trace.Int64Attribute("slot", int64(sMsg.Slot)),
trace.Int64Attribute("subnet", int64(subnet)),
)
if !hasPeer {
syncCommitteeBroadcastAttempts.Inc()
if err := func() error {
s.subnetLocker(wrappedSubIdx).Lock()
defer s.subnetLocker(wrappedSubIdx).Unlock()
ok, err := s.FindPeersWithSubnet(ctx, syncCommitteeToTopic(subnet, forkDigest), subnet, 1)
if err != nil {
return err
}
if ok {
savedSyncCommitteeBroadcasts.Inc()
return nil
}
return errors.New("failed to find peers for subnet")
}(); err != nil {
log.WithError(err).Error("Failed to find peers")
traceutil.AnnotateError(span, err)
}
}
// In the event our sync message is outdated and beyond the
// acceptable threshold, we exit early and do not broadcast it.
if err := altair.ValidateSyncMessageTime(sMsg.Slot, s.genesisTime, params.BeaconNetworkConfig().MaximumGossipClockDisparity); err != nil {
log.Warnf("Sync Committee Message is too old to broadcast, discarding it. %v", err)
return
}
if err := s.broadcastObject(ctx, sMsg, syncCommitteeToTopic(subnet, forkDigest)); err != nil {
log.WithError(err).Error("Failed to broadcast sync committee message")
traceutil.AnnotateError(span, err)
}
}
// method to broadcast messages to other peers in our gossip mesh.
func (s *Service) broadcastObject(ctx context.Context, obj interface{}, topic string) error {
func (s *Service) broadcastObject(ctx context.Context, obj ssz.Marshaler, topic string) error {
_, span := trace.StartSpan(ctx, "p2p.broadcastObject")
defer span.End()
@@ -126,7 +216,6 @@ func (s *Service) broadcastObject(ctx context.Context, obj interface{}, topic st
messageLen := int64(buf.Len())
span.AddMessageSendEvent(int64(id), messageLen /*uncompressed*/, messageLen /*compressed*/)
}
if err := s.PublishToTopic(ctx, topic+s.Encoding().ProtocolSuffix(), buf.Bytes()); err != nil {
err := errors.Wrap(err, "could not publish message")
traceutil.AnnotateError(span, err)
@@ -138,3 +227,7 @@ func (s *Service) broadcastObject(ctx context.Context, obj interface{}, topic st
func attestationToTopic(subnet uint64, forkDigest [4]byte) string {
return fmt.Sprintf(AttestationSubnetTopicFormat, forkDigest, subnet)
}
func syncCommitteeToTopic(subnet uint64, forkDigest [4]byte) string {
return fmt.Sprintf(SyncCommitteeSubnetTopicFormat, forkDigest, subnet)
}

View File

@@ -18,8 +18,8 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/scorers"
p2ptest "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
eth "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
pb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
statepb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/wrapper"
testpb "github.com/prysmaticlabs/prysm/proto/testing"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
@@ -46,7 +46,7 @@ func TestService_Broadcast(t *testing.T) {
genesisValidatorsRoot: bytesutil.PadTo([]byte{'A'}, 32),
}
msg := &statepb.Fork{
msg := &ethpb.Fork{
Epoch: 55,
CurrentVersion: []byte("fooo"),
PreviousVersion: []byte("barr"),
@@ -77,7 +77,7 @@ func TestService_Broadcast(t *testing.T) {
incomingMessage, err := sub.Next(ctx)
require.NoError(t, err)
result := &statepb.Fork{}
result := &ethpb.Fork{}
require.NoError(t, p.Encoding().DecodeGossip(incomingMessage.Data, result))
if !proto.Equal(result, msg) {
tt.Errorf("Did not receive expected message, got %+v, wanted %+v", result, msg)
@@ -365,3 +365,66 @@ func TestService_BroadcastAttestationWithDiscoveryAttempts(t *testing.T) {
t.Error("Failed to receive pubsub within 4s")
}
}
func TestService_BroadcastSyncCommittee(t *testing.T) {
p1 := p2ptest.NewTestP2P(t)
p2 := p2ptest.NewTestP2P(t)
p1.Connect(p2)
if len(p1.BHost.Network().Peers()) == 0 {
t.Fatal("No peers")
}
p := &Service{
host: p1.BHost,
pubsub: p1.PubSub(),
joinedTopics: map[string]*pubsub.Topic{},
cfg: &Config{},
genesisTime: time.Now(),
genesisValidatorsRoot: bytesutil.PadTo([]byte{'A'}, 32),
subnetsLock: make(map[uint64]*sync.RWMutex),
subnetsLockLock: sync.Mutex{},
peers: peers.NewStatus(context.Background(), &peers.StatusConfig{
ScorerParams: &scorers.Config{},
}),
}
msg := testutil.HydrateSyncCommittee(&pb.SyncCommitteeMessage{})
subnet := uint64(5)
topic := SyncCommitteeSubnetTopicFormat
GossipTypeMapping[reflect.TypeOf(msg)] = topic
digest, err := p.currentForkDigest()
require.NoError(t, err)
topic = fmt.Sprintf(topic, digest, subnet)
// External peer subscribes to the topic.
topic += p.Encoding().ProtocolSuffix()
sub, err := p2.SubscribeToTopic(topic)
require.NoError(t, err)
time.Sleep(50 * time.Millisecond) // libp2p fails without this delay...
// Async listen for the pubsub, must be before the broadcast.
var wg sync.WaitGroup
wg.Add(1)
go func(tt *testing.T) {
defer wg.Done()
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
incomingMessage, err := sub.Next(ctx)
require.NoError(t, err)
result := &pb.SyncCommitteeMessage{}
require.NoError(t, p.Encoding().DecodeGossip(incomingMessage.Data, result))
if !proto.Equal(result, msg) {
tt.Errorf("Did not receive expected message, got %+v, wanted %+v", result, msg)
}
}(t)
// Broadcast to peers and wait.
require.NoError(t, p.BroadcastSyncCommitteeMessage(context.Background(), subnet, msg))
if testutil.WaitTimeout(&wg, 1*time.Second) {
t.Error("Failed to receive pubsub within 1s")
}
}

View File

@@ -16,6 +16,9 @@ import (
"github.com/pkg/errors"
"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/version"
)
// Listener defines the discovery V5 network interface that is used
@@ -36,7 +39,7 @@ type Listener interface {
// to be dynamically discoverable by others given our tracked committee ids.
func (s *Service) RefreshENR() {
// return early if discv5 isnt running
if s.dv5Listener == nil {
if s.dv5Listener == nil || !s.isInitialized() {
return
}
bitV := bitfield.NewBitvector64()
@@ -46,14 +49,41 @@ func (s *Service) RefreshENR() {
}
currentBitV, err := attBitvector(s.dv5Listener.Self().Record())
if err != nil {
log.Errorf("Could not retrieve bitfield: %v", err)
log.Errorf("Could not retrieve att bitfield: %v", err)
return
}
if bytes.Equal(bitV, currentBitV) {
// return early if bitfield hasn't changed
return
// Compare current epoch with our fork epochs
currEpoch := helpers.SlotToEpoch(helpers.CurrentSlot(uint64(s.genesisTime.Unix())))
altairForkEpoch := params.BeaconConfig().AltairForkEpoch
switch {
// Altair Behaviour
case currEpoch >= altairForkEpoch:
// Retrieve sync subnets from application level
// cache.
bitS := bitfield.Bitvector4{byte(0x00)}
committees = cache.SyncSubnetIDs.GetAllSubnets(currEpoch)
for _, idx := range committees {
bitS.SetBitAt(idx, true)
}
currentBitS, err := syncBitvector(s.dv5Listener.Self().Record())
if err != nil {
log.Errorf("Could not retrieve sync bitfield: %v", err)
return
}
if bytes.Equal(bitV, currentBitV) && bytes.Equal(bitS, currentBitS) &&
s.Metadata().Version() == version.Altair {
// return early if bitfields haven't changed
return
}
s.updateSubnetRecordWithMetadataV2(bitV, bitS)
default:
// Phase 0 behaviour.
if bytes.Equal(bitV, currentBitV) {
// return early if bitfield hasn't changed
return
}
s.updateSubnetRecordWithMetadata(bitV)
}
s.updateSubnetRecordWithMetadata(bitV)
// ping all peers to inform them of new metadata
s.pingPeers()
}
@@ -206,7 +236,8 @@ func (s *Service) createLocalNode(
if err != nil {
return nil, errors.Wrap(err, "could not add eth2 fork version entry to enr")
}
return initializeAttSubnets(localNode), nil
localNode = initializeAttSubnets(localNode)
return initializeSyncCommSubnets(localNode), nil
}
func (s *Service) startDiscoveryV5(

View File

@@ -22,6 +22,7 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
"github.com/prysmaticlabs/go-bitfield"
mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers"
@@ -33,8 +34,10 @@ import (
"github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/wrapper"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/iputils"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
"github.com/prysmaticlabs/prysm/shared/version"
logTest "github.com/sirupsen/logrus/hooks/test"
)
@@ -340,3 +343,162 @@ func addPeer(t *testing.T, p *peers.Status, state peerdata.PeerConnectionState)
}))
return id
}
func TestRefreshENR_ForkBoundaries(t *testing.T) {
params.SetupTestConfigCleanup(t)
// Clean up caches after usage.
defer cache.SubnetIDs.EmptyAllCaches()
tests := []struct {
name string
svcBuilder func(t *testing.T) *Service
postValidation func(t *testing.T, s *Service)
}{
{
name: "metadata no change",
svcBuilder: func(t *testing.T) *Service {
port := 2000
ipAddr, pkey := createAddrAndPrivKey(t)
s := &Service{
genesisTime: time.Now(),
genesisValidatorsRoot: bytesutil.PadTo([]byte{'A'}, 32),
cfg: &Config{UDPPort: uint(port)},
}
listener, err := s.createListener(ipAddr, pkey)
assert.NoError(t, err)
s.dv5Listener = listener
s.metaData = wrapper.WrappedMetadataV0(new(pb.MetaDataV0))
s.updateSubnetRecordWithMetadata([]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00})
return s
},
postValidation: func(t *testing.T, s *Service) {
assert.DeepEqual(t, bitfield.NewBitvector64(), s.metaData.AttnetsBitfield())
},
},
{
name: "metadata updated",
svcBuilder: func(t *testing.T) *Service {
port := 2000
ipAddr, pkey := createAddrAndPrivKey(t)
s := &Service{
genesisTime: time.Now(),
genesisValidatorsRoot: bytesutil.PadTo([]byte{'A'}, 32),
cfg: &Config{UDPPort: uint(port)},
}
listener, err := s.createListener(ipAddr, pkey)
assert.NoError(t, err)
s.dv5Listener = listener
s.metaData = wrapper.WrappedMetadataV0(new(pb.MetaDataV0))
s.updateSubnetRecordWithMetadata([]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01})
cache.SubnetIDs.AddPersistentCommittee([]byte{'A'}, []uint64{1, 2, 3, 23}, 0)
return s
},
postValidation: func(t *testing.T, s *Service) {
assert.DeepEqual(t, bitfield.Bitvector64{0xe, 0x0, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0}, s.metaData.AttnetsBitfield())
},
},
{
name: "metadata updated at fork epoch",
svcBuilder: func(t *testing.T) *Service {
port := 2000
ipAddr, pkey := createAddrAndPrivKey(t)
s := &Service{
genesisTime: time.Now().Add(-5 * oneEpochDuration()),
genesisValidatorsRoot: bytesutil.PadTo([]byte{'A'}, 32),
cfg: &Config{UDPPort: uint(port)},
}
listener, err := s.createListener(ipAddr, pkey)
assert.NoError(t, err)
// Update params
cfg := params.BeaconConfig()
cfg.AltairForkEpoch = 5
params.OverrideBeaconConfig(cfg)
params.BeaconConfig().InitializeForkSchedule()
s.dv5Listener = listener
s.metaData = wrapper.WrappedMetadataV0(new(pb.MetaDataV0))
s.updateSubnetRecordWithMetadata([]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01})
cache.SubnetIDs.AddPersistentCommittee([]byte{'A'}, []uint64{1, 2, 3, 23}, 0)
return s
},
postValidation: func(t *testing.T, s *Service) {
assert.Equal(t, version.Altair, s.metaData.Version())
assert.DeepEqual(t, bitfield.Bitvector4{0x00}, s.metaData.MetadataObjV1().Syncnets)
assert.DeepEqual(t, bitfield.Bitvector64{0xe, 0x0, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0}, s.metaData.AttnetsBitfield())
},
},
{
name: "metadata updated at fork epoch with no bitfield",
svcBuilder: func(t *testing.T) *Service {
port := 2000
ipAddr, pkey := createAddrAndPrivKey(t)
s := &Service{
genesisTime: time.Now().Add(-5 * oneEpochDuration()),
genesisValidatorsRoot: bytesutil.PadTo([]byte{'A'}, 32),
cfg: &Config{UDPPort: uint(port)},
}
listener, err := s.createListener(ipAddr, pkey)
assert.NoError(t, err)
// Update params
cfg := params.BeaconConfig()
cfg.AltairForkEpoch = 5
params.OverrideBeaconConfig(cfg)
params.BeaconConfig().InitializeForkSchedule()
s.dv5Listener = listener
s.metaData = wrapper.WrappedMetadataV0(new(pb.MetaDataV0))
s.updateSubnetRecordWithMetadata([]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00})
return s
},
postValidation: func(t *testing.T, s *Service) {
assert.Equal(t, version.Altair, s.metaData.Version())
assert.DeepEqual(t, bitfield.Bitvector4{0x00}, s.metaData.MetadataObjV1().Syncnets)
assert.DeepEqual(t, bitfield.Bitvector64{0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, s.metaData.AttnetsBitfield())
},
},
{
name: "metadata updated past fork epoch with bitfields",
svcBuilder: func(t *testing.T) *Service {
port := 2000
ipAddr, pkey := createAddrAndPrivKey(t)
s := &Service{
genesisTime: time.Now().Add(-6 * oneEpochDuration()),
genesisValidatorsRoot: bytesutil.PadTo([]byte{'A'}, 32),
cfg: &Config{UDPPort: uint(port)},
}
listener, err := s.createListener(ipAddr, pkey)
assert.NoError(t, err)
// Update params
cfg := params.BeaconConfig()
cfg.AltairForkEpoch = 5
params.OverrideBeaconConfig(cfg)
params.BeaconConfig().InitializeForkSchedule()
s.dv5Listener = listener
s.metaData = wrapper.WrappedMetadataV0(new(pb.MetaDataV0))
s.updateSubnetRecordWithMetadata([]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00})
cache.SubnetIDs.AddPersistentCommittee([]byte{'A'}, []uint64{1, 2, 3, 23}, 0)
cache.SyncSubnetIDs.AddSyncCommitteeSubnets([]byte{'A'}, 0, []uint64{0, 1}, 0)
return s
},
postValidation: func(t *testing.T, s *Service) {
assert.Equal(t, version.Altair, s.metaData.Version())
assert.DeepEqual(t, bitfield.Bitvector4{0x03}, s.metaData.MetadataObjV1().Syncnets)
assert.DeepEqual(t, bitfield.Bitvector64{0xe, 0x0, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0}, s.metaData.AttnetsBitfield())
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := tt.svcBuilder(t)
s.RefreshENR()
tt.postValidation(t, s)
s.dv5Listener.Close()
cache.SubnetIDs.EmptyAllCaches()
cache.SyncSubnetIDs.EmptyAllCaches()
})
}
}

View File

@@ -0,0 +1,34 @@
package p2p
import (
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/slotutil"
)
// A background routine which listens for new and upcoming forks and
// updates the node's discovery service to reflect any new fork version
// changes.
func (s *Service) forkWatcher() {
slotTicker := slotutil.NewSlotTicker(s.genesisTime, params.BeaconConfig().SecondsPerSlot)
for {
select {
case currSlot := <-slotTicker.C():
currEpoch := helpers.SlotToEpoch(currSlot)
if currEpoch == params.BeaconConfig().AltairForkEpoch {
// If we are in the fork epoch, we update our enr with
// the updated fork digest. These repeatedly does
// this over the epoch, which might be slightly wasteful
// but is fine nonetheless.
_, err := addForkEntry(s.dv5Listener.LocalNode(), s.genesisTime, s.genesisValidatorsRoot)
if err != nil {
log.WithError(err).Error("Could not add fork entry")
}
}
case <-s.ctx.Done():
log.Debug("Context closed, exiting goroutine")
slotTicker.Done()
return
}
}
}

View File

@@ -21,9 +21,15 @@ const (
// aggregateWeight specifies the scoring weight that we apply to
// our aggregate topic.
aggregateWeight = 0.5
// syncContributionWeight specifies the scoring weight that we apply to
// our sync contribution topic.
syncContributionWeight = 0.2
// attestationTotalWeight specifies the scoring weight that we apply to
// our attestation subnet topic.
attestationTotalWeight = 1
// syncCommitteesTotalWeight specifies the scoring weight that we apply to
// our sync subnet topic.
syncCommitteesTotalWeight = 0.4
// attesterSlashingWeight specifies the scoring weight that we apply to
// our attester slashing topic.
attesterSlashingWeight = 0.05
@@ -50,6 +56,12 @@ const (
var (
// a bool to check if we enable scoring for messages in the mesh sent for near first deliveries.
meshDeliveryIsScored = false
// Defines the variables representing the different time periods.
oneHundredEpochs = 100 * oneEpochDuration()
invalidDecayPeriod = 50 * oneEpochDuration()
twentyEpochs = 20 * oneEpochDuration()
tenEpochs = 10 * oneEpochDuration()
)
func peerScoringParams() (*pubsub.PeerScoreParams, *pubsub.PeerScoreThresholds) {
@@ -72,10 +84,10 @@ func peerScoringParams() (*pubsub.PeerScoreParams, *pubsub.PeerScoreThresholds)
IPColocationFactorWhitelist: nil,
BehaviourPenaltyWeight: -15.92,
BehaviourPenaltyThreshold: 6,
BehaviourPenaltyDecay: scoreDecay(10 * oneEpochDuration()),
DecayInterval: 1 * oneSlotDuration(),
BehaviourPenaltyDecay: scoreDecay(tenEpochs),
DecayInterval: oneSlotDuration(),
DecayToZero: decayToZero,
RetainScore: 100 * oneEpochDuration(),
RetainScore: oneHundredEpochs,
}
return scoreParams, thresholds
}
@@ -86,17 +98,21 @@ func (s *Service) topicScoreParams(topic string) (*pubsub.TopicScoreParams, erro
return nil, err
}
switch {
case strings.Contains(topic, "beacon_block"):
case strings.Contains(topic, GossipBlockMessage):
return defaultBlockTopicParams(), nil
case strings.Contains(topic, "beacon_aggregate_and_proof"):
case strings.Contains(topic, GossipAggregateAndProofMessage):
return defaultAggregateTopicParams(activeValidators)
case strings.Contains(topic, "beacon_attestation"):
case strings.Contains(topic, GossipAttestationMessage):
return defaultAggregateSubnetTopicParams(activeValidators)
case strings.Contains(topic, "voluntary_exit"):
case strings.Contains(topic, GossipSyncCommitteeMessage):
return defaultSyncSubnetTopicParams(activeValidators)
case strings.Contains(topic, GossipContributionAndProofMessage):
return defaultSyncContributionTopicParams()
case strings.Contains(topic, GossipExitMessage):
return defaultVoluntaryExitTopicParams(), nil
case strings.Contains(topic, "proposer_slashing"):
case strings.Contains(topic, GossipProposerSlashingMessage):
return defaultProposerSlashingTopicParams(), nil
case strings.Contains(topic, "attester_slashing"):
case strings.Contains(topic, GossipAttesterSlashingMessage):
return defaultAttesterSlashingTopicParams(), nil
default:
return nil, errors.Errorf("unrecognized topic provided for parameter registration: %s", topic)
@@ -158,7 +174,7 @@ func defaultBlockTopicParams() *pubsub.TopicScoreParams {
TimeInMeshQuantum: inMeshTime(),
TimeInMeshCap: inMeshCap(),
FirstMessageDeliveriesWeight: 1,
FirstMessageDeliveriesDecay: scoreDecay(20 * oneEpochDuration()),
FirstMessageDeliveriesDecay: scoreDecay(twentyEpochs),
FirstMessageDeliveriesCap: 23,
MeshMessageDeliveriesWeight: meshWeight,
MeshMessageDeliveriesDecay: scoreDecay(decayEpoch * oneEpochDuration()),
@@ -169,7 +185,7 @@ func defaultBlockTopicParams() *pubsub.TopicScoreParams {
MeshFailurePenaltyWeight: meshWeight,
MeshFailurePenaltyDecay: scoreDecay(decayEpoch * oneEpochDuration()),
InvalidMessageDeliveriesWeight: -140.4475,
InvalidMessageDeliveriesDecay: scoreDecay(50 * oneEpochDuration()),
InvalidMessageDeliveriesDecay: scoreDecay(invalidDecayPeriod),
}
}
@@ -211,7 +227,49 @@ func defaultAggregateTopicParams(activeValidators uint64) (*pubsub.TopicScorePar
MeshFailurePenaltyWeight: meshWeight,
MeshFailurePenaltyDecay: scoreDecay(1 * oneEpochDuration()),
InvalidMessageDeliveriesWeight: -maxScore() / aggregateWeight,
InvalidMessageDeliveriesDecay: scoreDecay(50 * oneEpochDuration()),
InvalidMessageDeliveriesDecay: scoreDecay(invalidDecayPeriod),
}, nil
}
func defaultSyncContributionTopicParams() (*pubsub.TopicScoreParams, error) {
// Determine the expected message rate for the particular gossip topic.
aggPerSlot := params.BeaconConfig().SyncCommitteeSubnetCount * params.BeaconConfig().TargetAggregatorsPerSyncSubcommittee
firstMessageCap, err := decayLimit(scoreDecay(1*oneEpochDuration()), float64(aggPerSlot*2/gossipSubD))
if err != nil {
log.Warnf("skipping initializing topic scoring: %v", err)
return nil, nil
}
firstMessageWeight := maxFirstDeliveryScore / firstMessageCap
meshThreshold, err := decayThreshold(scoreDecay(1*oneEpochDuration()), float64(aggPerSlot)/dampeningFactor)
if err != nil {
log.Warnf("skipping initializing topic scoring: %v", err)
return nil, nil
}
meshWeight := -scoreByWeight(syncContributionWeight, meshThreshold)
meshCap := 4 * meshThreshold
if !meshDeliveryIsScored {
// Set the mesh weight as zero as a temporary measure, so as to prevent
// the average nodes from being penalised.
meshWeight = 0
}
return &pubsub.TopicScoreParams{
TopicWeight: syncContributionWeight,
TimeInMeshWeight: maxInMeshScore / inMeshCap(),
TimeInMeshQuantum: inMeshTime(),
TimeInMeshCap: inMeshCap(),
FirstMessageDeliveriesWeight: firstMessageWeight,
FirstMessageDeliveriesDecay: scoreDecay(1 * oneEpochDuration()),
FirstMessageDeliveriesCap: firstMessageCap,
MeshMessageDeliveriesWeight: meshWeight,
MeshMessageDeliveriesDecay: scoreDecay(1 * oneEpochDuration()),
MeshMessageDeliveriesCap: meshCap,
MeshMessageDeliveriesThreshold: meshThreshold,
MeshMessageDeliveriesWindow: 2 * time.Second,
MeshMessageDeliveriesActivation: 1 * oneEpochDuration(),
MeshFailurePenaltyWeight: meshWeight,
MeshFailurePenaltyDecay: scoreDecay(1 * oneEpochDuration()),
InvalidMessageDeliveriesWeight: -maxScore() / syncContributionWeight,
InvalidMessageDeliveriesDecay: scoreDecay(invalidDecayPeriod),
}, nil
}
@@ -238,8 +296,13 @@ func defaultAggregateSubnetTopicParams(activeValidators uint64) (*pubsub.TopicSc
firstDecay = 4
meshDecay = 16
}
rate := numPerSlot * 2 / gossipSubD
if rate == 0 {
log.Warn("rate is 0, skipping initializing topic scoring")
return nil, nil
}
// Determine expected first deliveries based on the message rate.
firstMessageCap, err := decayLimit(scoreDecay(firstDecay*oneEpochDuration()), float64(numPerSlot*2/gossipSubD))
firstMessageCap, err := decayLimit(scoreDecay(firstDecay*oneEpochDuration()), float64(rate))
if err != nil {
log.Warnf("skipping initializing topic scoring: %v", err)
return nil, nil
@@ -275,7 +338,70 @@ func defaultAggregateSubnetTopicParams(activeValidators uint64) (*pubsub.TopicSc
MeshFailurePenaltyWeight: meshWeight,
MeshFailurePenaltyDecay: scoreDecay(meshDecay * oneEpochDuration()),
InvalidMessageDeliveriesWeight: -maxScore() / topicWeight,
InvalidMessageDeliveriesDecay: scoreDecay(50 * oneEpochDuration()),
InvalidMessageDeliveriesDecay: scoreDecay(invalidDecayPeriod),
}, nil
}
func defaultSyncSubnetTopicParams(activeValidators uint64) (*pubsub.TopicScoreParams, error) {
subnetCount := params.BeaconConfig().SyncCommitteeSubnetCount
// Get weight for each specific subnet.
topicWeight := syncCommitteesTotalWeight / float64(subnetCount)
syncComSize := params.BeaconConfig().SyncCommitteeSize
// Set the max as the sync committee size
if activeValidators > syncComSize {
activeValidators = syncComSize
}
subnetWeight := activeValidators / subnetCount
if subnetWeight == 0 {
log.Warn("Subnet weight is 0, skipping initializing topic scoring")
return nil, nil
}
firstDecay := time.Duration(1)
meshDecay := time.Duration(4)
rate := subnetWeight * 2 / gossipSubD
if rate == 0 {
log.Warn("rate is 0, skipping initializing topic scoring")
return nil, nil
}
// Determine expected first deliveries based on the message rate.
firstMessageCap, err := decayLimit(scoreDecay(firstDecay*oneEpochDuration()), float64(rate))
if err != nil {
log.WithError(err).Warn("Skipping initializing topic scoring")
return nil, nil
}
firstMessageWeight := maxFirstDeliveryScore / firstMessageCap
// Determine expected mesh deliveries based on message rate applied with a dampening factor.
meshThreshold, err := decayThreshold(scoreDecay(meshDecay*oneEpochDuration()), float64(subnetWeight)/dampeningFactor)
if err != nil {
log.WithError(err).Warn("Skipping initializing topic scoring")
return nil, nil
}
meshWeight := -scoreByWeight(topicWeight, meshThreshold)
meshCap := 4 * meshThreshold
if !meshDeliveryIsScored {
// Set the mesh weight as zero as a temporary measure, so as to prevent
// the average nodes from being penalised.
meshWeight = 0
}
return &pubsub.TopicScoreParams{
TopicWeight: topicWeight,
TimeInMeshWeight: maxInMeshScore / inMeshCap(),
TimeInMeshQuantum: inMeshTime(),
TimeInMeshCap: inMeshCap(),
FirstMessageDeliveriesWeight: firstMessageWeight,
FirstMessageDeliveriesDecay: scoreDecay(firstDecay * oneEpochDuration()),
FirstMessageDeliveriesCap: firstMessageCap,
MeshMessageDeliveriesWeight: meshWeight,
MeshMessageDeliveriesDecay: scoreDecay(meshDecay * oneEpochDuration()),
MeshMessageDeliveriesCap: meshCap,
MeshMessageDeliveriesThreshold: meshThreshold,
MeshMessageDeliveriesWindow: 2 * time.Second,
MeshMessageDeliveriesActivation: 1 * oneEpochDuration(),
MeshFailurePenaltyWeight: meshWeight,
MeshFailurePenaltyDecay: scoreDecay(meshDecay * oneEpochDuration()),
InvalidMessageDeliveriesWeight: -maxScore() / topicWeight,
InvalidMessageDeliveriesDecay: scoreDecay(invalidDecayPeriod),
}, nil
}
@@ -286,7 +412,7 @@ func defaultAttesterSlashingTopicParams() *pubsub.TopicScoreParams {
TimeInMeshQuantum: inMeshTime(),
TimeInMeshCap: inMeshCap(),
FirstMessageDeliveriesWeight: 36,
FirstMessageDeliveriesDecay: scoreDecay(100 * oneEpochDuration()),
FirstMessageDeliveriesDecay: scoreDecay(oneHundredEpochs),
FirstMessageDeliveriesCap: 1,
MeshMessageDeliveriesWeight: 0,
MeshMessageDeliveriesDecay: 0,
@@ -297,7 +423,7 @@ func defaultAttesterSlashingTopicParams() *pubsub.TopicScoreParams {
MeshFailurePenaltyWeight: 0,
MeshFailurePenaltyDecay: 0,
InvalidMessageDeliveriesWeight: -2000,
InvalidMessageDeliveriesDecay: scoreDecay(50 * oneEpochDuration()),
InvalidMessageDeliveriesDecay: scoreDecay(invalidDecayPeriod),
}
}
@@ -308,7 +434,7 @@ func defaultProposerSlashingTopicParams() *pubsub.TopicScoreParams {
TimeInMeshQuantum: inMeshTime(),
TimeInMeshCap: inMeshCap(),
FirstMessageDeliveriesWeight: 36,
FirstMessageDeliveriesDecay: scoreDecay(100 * oneEpochDuration()),
FirstMessageDeliveriesDecay: scoreDecay(oneHundredEpochs),
FirstMessageDeliveriesCap: 1,
MeshMessageDeliveriesWeight: 0,
MeshMessageDeliveriesDecay: 0,
@@ -319,7 +445,7 @@ func defaultProposerSlashingTopicParams() *pubsub.TopicScoreParams {
MeshFailurePenaltyWeight: 0,
MeshFailurePenaltyDecay: 0,
InvalidMessageDeliveriesWeight: -2000,
InvalidMessageDeliveriesDecay: scoreDecay(50 * oneEpochDuration()),
InvalidMessageDeliveriesDecay: scoreDecay(invalidDecayPeriod),
}
}
@@ -330,7 +456,7 @@ func defaultVoluntaryExitTopicParams() *pubsub.TopicScoreParams {
TimeInMeshQuantum: inMeshTime(),
TimeInMeshCap: inMeshCap(),
FirstMessageDeliveriesWeight: 2,
FirstMessageDeliveriesDecay: scoreDecay(100 * oneEpochDuration()),
FirstMessageDeliveriesDecay: scoreDecay(oneHundredEpochs),
FirstMessageDeliveriesCap: 5,
MeshMessageDeliveriesWeight: 0,
MeshMessageDeliveriesDecay: 0,
@@ -341,7 +467,7 @@ func defaultVoluntaryExitTopicParams() *pubsub.TopicScoreParams {
MeshFailurePenaltyWeight: 0,
MeshFailurePenaltyDecay: 0,
InvalidMessageDeliveriesWeight: -2000,
InvalidMessageDeliveriesDecay: scoreDecay(50 * oneEpochDuration()),
InvalidMessageDeliveriesDecay: scoreDecay(invalidDecayPeriod),
}
}
@@ -401,8 +527,9 @@ func scoreByWeight(weight, threshold float64) float64 {
// maxScore attainable by a peer.
func maxScore() float64 {
totalWeight := beaconBlockWeight + aggregateWeight + attestationTotalWeight +
attesterSlashingWeight + proposerSlashingWeight + voluntaryExitWeight
totalWeight := beaconBlockWeight + aggregateWeight + syncContributionWeight +
attestationTotalWeight + syncCommitteesTotalWeight + attesterSlashingWeight +
proposerSlashingWeight + voluntaryExitWeight
return (maxInMeshScore + maxFirstDeliveryScore) * totalWeight
}

View File

@@ -3,27 +3,53 @@ package p2p
import (
"reflect"
types "github.com/prysmaticlabs/eth2-types"
ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
pb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/params"
"google.golang.org/protobuf/proto"
)
// GossipTopicMappings represent the protocol ID to protobuf message type map for easy
// gossipTopicMappings represent the protocol ID to protobuf message type map for easy
// lookup.
var GossipTopicMappings = map[string]proto.Message{
BlockSubnetTopicFormat: &pb.SignedBeaconBlock{},
AttestationSubnetTopicFormat: &pb.Attestation{},
ExitSubnetTopicFormat: &pb.SignedVoluntaryExit{},
ProposerSlashingSubnetTopicFormat: &pb.ProposerSlashing{},
AttesterSlashingSubnetTopicFormat: &pb.AttesterSlashing{},
AggregateAndProofSubnetTopicFormat: &pb.SignedAggregateAttestationAndProof{},
var gossipTopicMappings = map[string]proto.Message{
BlockSubnetTopicFormat: &pb.SignedBeaconBlock{},
AttestationSubnetTopicFormat: &pb.Attestation{},
ExitSubnetTopicFormat: &pb.SignedVoluntaryExit{},
ProposerSlashingSubnetTopicFormat: &pb.ProposerSlashing{},
AttesterSlashingSubnetTopicFormat: &pb.AttesterSlashing{},
AggregateAndProofSubnetTopicFormat: &pb.SignedAggregateAttestationAndProof{},
SyncContributionAndProofSubnetTopicFormat: &ethpb.SignedContributionAndProof{},
SyncCommitteeSubnetTopicFormat: &ethpb.SyncCommitteeMessage{},
}
// GossipTopicMappings is a function to return the assigned data type
// versioned by epoch.
func GossipTopicMappings(topic string, epoch types.Epoch) proto.Message {
if topic == BlockSubnetTopicFormat && epoch >= params.BeaconConfig().AltairForkEpoch {
return &ethpb.SignedBeaconBlockAltair{}
}
return gossipTopicMappings[topic]
}
// AllTopics returns all topics stored in our
// gossip mapping.
func AllTopics() []string {
topics := []string{}
for k := range gossipTopicMappings {
topics = append(topics, k)
}
return topics
}
// GossipTypeMapping is the inverse of GossipTopicMappings so that an arbitrary protobuf message
// can be mapped to a protocol ID string.
var GossipTypeMapping = make(map[reflect.Type]string, len(GossipTopicMappings))
var GossipTypeMapping = make(map[reflect.Type]string, len(gossipTopicMappings))
func init() {
for k, v := range GossipTopicMappings {
for k, v := range gossipTopicMappings {
GossipTypeMapping[reflect.TypeOf(v)] = k
}
// Specially handle Altair Objects.
GossipTypeMapping[reflect.TypeOf(&ethpb.SignedBeaconBlockAltair{})] = BlockSubnetTopicFormat
}

View File

@@ -3,14 +3,39 @@ package p2p
import (
"reflect"
"testing"
eth2types "github.com/prysmaticlabs/eth2-types"
ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
)
func TestMappingHasNoDuplicates(t *testing.T) {
m := make(map[reflect.Type]bool)
for _, v := range GossipTopicMappings {
for _, v := range gossipTopicMappings {
if _, ok := m[reflect.TypeOf(v)]; ok {
t.Errorf("%T is duplicated in the topic mapping", v)
}
m[reflect.TypeOf(v)] = true
}
}
func TestGossipTopicMappings_CorrectBlockType(t *testing.T) {
params.SetupTestConfigCleanup(t)
bCfg := params.BeaconConfig()
forkEpoch := eth2types.Epoch(100)
bCfg.AltairForkEpoch = forkEpoch
bCfg.ForkVersionSchedule[bytesutil.ToBytes4(bCfg.AltairForkVersion)] = eth2types.Epoch(100)
params.OverrideBeaconConfig(bCfg)
// Before Fork
pMessage := GossipTopicMappings(BlockSubnetTopicFormat, 0)
_, ok := pMessage.(*ethpb.SignedBeaconBlock)
assert.Equal(t, true, ok)
// After Fork
pMessage = GossipTopicMappings(BlockSubnetTopicFormat, forkEpoch)
_, ok = pMessage.(*ethpb.SignedBeaconBlockAltair)
assert.Equal(t, true, ok)
}

View File

@@ -35,6 +35,7 @@ type P2P interface {
type Broadcaster interface {
Broadcast(context.Context, proto.Message) error
BroadcastAttestation(ctx context.Context, subnet uint64, att *ethpb.Attestation) error
BroadcastSyncCommitteeMessage(ctx context.Context, subnet uint64, sMsg *ethpb.SyncCommitteeMessage) error
}
// SetStreamHandler configures p2p to handle streams of a certain topic ID.

View File

@@ -25,6 +25,16 @@ var (
Name: "p2p_attestation_subnet_attempted_broadcasts",
Help: "The number of attestations that were attempted to be broadcast.",
})
savedSyncCommitteeBroadcasts = promauto.NewCounter(prometheus.CounterOpts{
Name: "p2p_sync_committee_subnet_recovered_broadcasts",
Help: "The number of sync committee messages that were attempted to be broadcast with no peers on " +
"the subnet. The beacon node increments this counter when the broadcast is blocked " +
"until a subnet peer can be found.",
})
syncCommitteeBroadcastAttempts = promauto.NewCounter(prometheus.CounterOpts{
Name: "p2p_sync_committee_subnet_attempted_broadcasts",
Help: "The number of sync committee that were attempted to be broadcast.",
})
)
func (s *Service) updateMetrics() {

View File

@@ -39,7 +39,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/peerdata"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/scorers"
pb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
metadata "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/metadata"
"github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/metadata"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/rand"
"github.com/prysmaticlabs/prysm/shared/timeutils"
@@ -235,7 +235,7 @@ func (p *Status) SetMetadata(pid peer.ID, metaData metadata.Metadata) {
defer p.store.Unlock()
peerData := p.store.PeerDataGetOrCreate(pid)
peerData.MetaData = metaData
peerData.MetaData = metaData.Copy()
}
// Metadata returns a copy of the metadata corresponding to the provided

View File

@@ -8,11 +8,18 @@ import (
pubsub "github.com/libp2p/go-libp2p-pubsub"
pubsubpb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/encoder"
"github.com/prysmaticlabs/prysm/shared/p2putils"
"github.com/prysmaticlabs/prysm/shared/params"
)
var _ pubsub.SubscriptionFilter = (*Service)(nil)
const pubsubSubscriptionRequestLimit = 100
// It is set at this limit to handle the possibility
// of double topic subscriptions at fork boundaries.
// -> 64 Attestation Subnets * 2.
// -> 4 Sync Committee Subnets * 2.
// -> Block,Aggregate,ProposerSlashing,AttesterSlashing,Exits,SyncContribution * 2.
const pubsubSubscriptionRequestLimit = 200
// CanSubscribe returns true if the topic is of interest and we could subscribe to it.
func (s *Service) CanSubscribe(topic string) bool {
@@ -35,7 +42,12 @@ func (s *Service) CanSubscribe(topic string) bool {
log.WithError(err).Error("Could not determine fork digest")
return false
}
if parts[2] != fmt.Sprintf("%x", fd) {
digest, err := p2putils.ForkDigestFromEpoch(params.BeaconConfig().AltairForkEpoch, s.genesisValidatorsRoot)
if err != nil {
log.WithError(err).Error("Could not determine next fork digest")
return false
}
if parts[2] != fmt.Sprintf("%x", fd) && parts[2] != fmt.Sprintf("%x", digest) {
return false
}
if parts[4] != encoder.ProtocolSuffixSSZSnappy {
@@ -43,7 +55,7 @@ func (s *Service) CanSubscribe(topic string) bool {
}
// Check the incoming topic matches any topic mapping. This includes a check for part[3].
for gt := range GossipTopicMappings {
for gt := range gossipTopicMappings {
if _, err := scanfcheck(strings.Join(parts[0:4], "/"), gt); err == nil {
return true
}

View File

@@ -86,11 +86,11 @@ func TestService_CanSubscribe(t *testing.T) {
}
// Ensure all gossip topic mappings pass validation.
for topic := range GossipTopicMappings {
for _, topic := range AllTopics() {
formatting := []interface{}{digest}
// Special case for attestation subnets which have a second formatting placeholder.
if topic == AttestationSubnetTopicFormat {
if topic == AttestationSubnetTopicFormat || topic == SyncCommitteeSubnetTopicFormat {
formatting = append(formatting, 0 /* some subnet ID */)
}
@@ -193,7 +193,7 @@ func Test_scanfcheck(t *testing.T) {
func TestGossipTopicMapping_scanfcheck_GossipTopicFormattingSanityCheck(t *testing.T) {
// scanfcheck only supports integer based substitutions at the moment. Any others will
// inaccurately fail validation.
for topic := range GossipTopicMappings {
for _, topic := range AllTopics() {
t.Run(topic, func(t *testing.T) {
for i, c := range topic {
if string(c) == "%" {
@@ -356,5 +356,4 @@ func TestService_MonitorsStateForkUpdates(t *testing.T) {
time.Sleep(50 * time.Millisecond)
require.True(t, s.isInitialized())
require.NotEmpty(t, s.currentForkDigest)
}

View File

@@ -3,10 +3,14 @@ package p2p
import (
"context"
ssz "github.com/ferranbt/fastssz"
"github.com/kr/pretty"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/shared/traceutil"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)
@@ -23,6 +27,11 @@ func (s *Service) Send(ctx context.Context, message interface{}, baseTopic strin
topic := baseTopic + s.Encoding().ProtocolSuffix()
span.AddAttributes(trace.StringAttribute("topic", topic))
log.WithFields(logrus.Fields{
"topic": topic,
"request": pretty.Sprint(message),
}).Tracef("Sending RPC request to peer %s", pid.String())
// Apply max dial timeout when opening a new stream.
ctx, cancel := context.WithTimeout(ctx, maxDialTimeout)
defer cancel()
@@ -33,8 +42,12 @@ func (s *Service) Send(ctx context.Context, message interface{}, baseTopic strin
return nil, err
}
// do not encode anything if we are sending a metadata request
if baseTopic != RPCMetaDataTopicV1 {
if _, err := s.Encoding().EncodeWithMaxLength(stream, message); err != nil {
if baseTopic != RPCMetaDataTopicV1 && baseTopic != RPCMetaDataTopicV2 {
castedMsg, ok := message.(ssz.Marshaler)
if !ok {
return nil, errors.Errorf("%T does not support the ssz marshaller interface", message)
}
if _, err := s.Encoding().EncodeWithMaxLength(stream, castedMsg); err != nil {
traceutil.AnnotateError(span, err)
_err := stream.Reset()
_ = _err

View File

@@ -26,6 +26,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/encoder"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers/scorers"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/types"
"github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/metadata"
"github.com/prysmaticlabs/prysm/shared"
"github.com/prysmaticlabs/prysm/shared/params"
@@ -93,7 +94,7 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) {
cancel: cancel,
cfg: cfg,
isPreGenesis: true,
joinedTopics: make(map[string]*pubsub.Topic, len(GossipTopicMappings)),
joinedTopics: make(map[string]*pubsub.Topic, len(gossipTopicMappings)),
subnetsLock: make(map[uint64]*sync.RWMutex),
}
@@ -128,7 +129,6 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) {
s.host = h
s.host.RemoveStreamHandler(identify.IDDelta)
// Gossipsub registration is done before we add in any new peers
// due to libp2p's gossipsub implementation not taking into
// account previously added peers when creating the gossipsub
@@ -167,6 +167,9 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) {
},
})
// Initialize Data maps.
types.InitializeDataMaps()
return s, nil
}
@@ -220,6 +223,9 @@ func (s *Service) Start() {
}
s.connectWithAllPeers(addrs)
}
// Initialize metadata according to the
// current epoch.
s.RefreshENR()
// Periodic functions.
runutil.RunEvery(s.ctx, params.BeaconNetworkConfig().TtfbTimeout, func() {
@@ -253,6 +259,7 @@ func (s *Service) Start() {
if p2pHostDNS != "" {
logExternalDNSAddr(s.host.ID(), p2pHostDNS, p2pTCPPort)
}
go s.forkWatcher()
}
// Stop the p2p service and terminate all peer connections.

View File

@@ -26,6 +26,7 @@ go_library(
"@com_github_ethereum_go_ethereum//crypto:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enode:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enr:go_default_library",
"@com_github_ferranbt_fastssz//:go_default_library",
"@com_github_libp2p_go_libp2p_blankhost//:go_default_library",
"@com_github_libp2p_go_libp2p_core//:go_default_library",
"@com_github_libp2p_go_libp2p_core//connmgr:go_default_library",

View File

@@ -138,6 +138,11 @@ func (p *FakeP2P) BroadcastAttestation(_ context.Context, _ uint64, _ *ethpb.Att
return nil
}
// BroadcastSyncCommitteeMessage -- fake.
func (b *FakeP2P) BroadcastSyncCommitteeMessage(_ context.Context, _ uint64, _ *ethpb.SyncCommitteeMessage) error {
return nil
}
// InterceptPeerDial -- fake.
func (p *FakeP2P) InterceptPeerDial(peer.ID) (allow bool) {
return true

View File

@@ -27,3 +27,9 @@ func (m *MockBroadcaster) BroadcastAttestation(_ context.Context, _ uint64, a *e
m.BroadcastAttestations = append(m.BroadcastAttestations, a)
return nil
}
// BroadcastSyncCommitteeMessage records a broadcast occurred.
func (m *MockBroadcaster) BroadcastSyncCommitteeMessage(_ context.Context, _ uint64, _ *ethpb.SyncCommitteeMessage) error {
m.BroadcastCalled = true
return nil
}

View File

@@ -10,6 +10,7 @@ import (
"time"
"github.com/ethereum/go-ethereum/p2p/enr"
ssz "github.com/ferranbt/fastssz"
bhost "github.com/libp2p/go-libp2p-blankhost"
core "github.com/libp2p/go-libp2p-core"
"github.com/libp2p/go-libp2p-core/control"
@@ -29,6 +30,11 @@ import (
"google.golang.org/protobuf/proto"
)
// We have to declare this again here to prevent a circular dependancy
// with the main p2p package.
const metatadataV1Topic = "/eth2/beacon_chain/req/metadata/1"
const metatadataV2Topic = "/eth2/beacon_chain/req/metadata/2"
// TestP2P represents a p2p implementation that can be used for testing.
type TestP2P struct {
t *testing.T
@@ -99,7 +105,11 @@ func (p *TestP2P) ReceiveRPC(topic string, msg proto.Message) {
}
}()
n, err := p.Encoding().EncodeWithMaxLength(s, msg)
castedMsg, ok := msg.(ssz.Marshaler)
if !ok {
p.t.Fatalf("%T doesnt support ssz marshaler", msg)
}
n, err := p.Encoding().EncodeWithMaxLength(s, castedMsg)
if err != nil {
_err := s.Reset()
_ = _err
@@ -127,8 +137,12 @@ func (p *TestP2P) ReceivePubSub(topic string, msg proto.Message) {
// pick up the newly connected peer.
time.Sleep(time.Millisecond * 100)
castedMsg, ok := msg.(ssz.Marshaler)
if !ok {
p.t.Fatalf("%T doesnt support ssz marshaler", msg)
}
buf := new(bytes.Buffer)
if _, err := p.Encoding().EncodeGossip(buf, msg); err != nil {
if _, err := p.Encoding().EncodeGossip(buf, castedMsg); err != nil {
p.t.Fatalf("Failed to encode message: %v", err)
}
digest, err := p.ForkDigest()
@@ -156,6 +170,12 @@ func (p *TestP2P) BroadcastAttestation(_ context.Context, _ uint64, _ *ethpb.Att
return nil
}
// BroadcastSyncCommitteeMessage broadcasts a sync committee message.
func (p *TestP2P) BroadcastSyncCommitteeMessage(_ context.Context, _ uint64, _ *ethpb.SyncCommitteeMessage) error {
p.BroadcastCalled = true
return nil
}
// SetStreamHandler for RPC.
func (p *TestP2P) SetStreamHandler(topic string, handler network.StreamHandler) {
p.BHost.SetStreamHandler(protocol.ID(topic), handler)
@@ -292,8 +312,12 @@ func (p *TestP2P) Send(ctx context.Context, msg interface{}, topic string, pid p
return nil, err
}
if topic != "/eth2/beacon_chain/req/metadata/1" {
if _, err := p.Encoding().EncodeWithMaxLength(stream, msg); err != nil {
if topic != metatadataV1Topic && topic != metatadataV2Topic {
castedMsg, ok := msg.(ssz.Marshaler)
if !ok {
p.t.Fatalf("%T doesnt support ssz marshaler", msg)
}
if _, err := p.Encoding().EncodeWithMaxLength(stream, castedMsg); err != nil {
_err := stream.Reset()
_ = _err
return nil, err

View File

@@ -1,6 +1,11 @@
package p2p
const (
// GossipProtocolAndDigest represents the protocol and fork digest prefix in a gossip topic.
GossipProtocolAndDigest = "/eth2/%x/"
// Message Types
//
// GossipAttestationMessage is the name for the attestation message type. It is
// specially extracted so as to determine the correct message type from an attestation
// subnet.
@@ -9,20 +14,35 @@ const (
// specially extracted so as to determine the correct message type from a sync committee
// subnet.
GossipSyncCommitteeMessage = "sync_committee"
// GossipBlockMessage is the name for the block message type.
GossipBlockMessage = "beacon_block"
// GossipExitMessage is the name for the voluntary exit message type.
GossipExitMessage = "voluntary_exit"
// GossipProposerSlashingMessage is the name for the proposer slashing message type.
GossipProposerSlashingMessage = "proposer_slashing"
// GossipAttesterSlashingMessage is the name for the attester slashing message type.
GossipAttesterSlashingMessage = "attester_slashing"
// GossipAggregateAndProofMessage is the name for the attestation aggregate and proof message type.
GossipAggregateAndProofMessage = "beacon_aggregate_and_proof"
// GossipContributionAndProofMessage is the name for the sync contribution and proof message type.
GossipContributionAndProofMessage = "sync_committee_contribution_and_proof"
// Topic Formats
//
// AttestationSubnetTopicFormat is the topic format for the attestation subnet.
AttestationSubnetTopicFormat = "/eth2/%x/" + GossipAttestationMessage + "_%d"
AttestationSubnetTopicFormat = GossipProtocolAndDigest + GossipAttestationMessage + "_%d"
// SyncCommitteeSubnetTopicFormat is the topic format for the sync committee subnet.
SyncCommitteeSubnetTopicFormat = "/eth2/%x/" + GossipSyncCommitteeMessage + "_%d"
SyncCommitteeSubnetTopicFormat = GossipProtocolAndDigest + GossipSyncCommitteeMessage + "_%d"
// BlockSubnetTopicFormat is the topic format for the block subnet.
BlockSubnetTopicFormat = "/eth2/%x/beacon_block"
BlockSubnetTopicFormat = GossipProtocolAndDigest + GossipBlockMessage
// ExitSubnetTopicFormat is the topic format for the voluntary exit subnet.
ExitSubnetTopicFormat = "/eth2/%x/voluntary_exit"
ExitSubnetTopicFormat = GossipProtocolAndDigest + GossipExitMessage
// ProposerSlashingSubnetTopicFormat is the topic format for the proposer slashing subnet.
ProposerSlashingSubnetTopicFormat = "/eth2/%x/proposer_slashing"
ProposerSlashingSubnetTopicFormat = GossipProtocolAndDigest + GossipProposerSlashingMessage
// AttesterSlashingSubnetTopicFormat is the topic format for the attester slashing subnet.
AttesterSlashingSubnetTopicFormat = "/eth2/%x/attester_slashing"
AttesterSlashingSubnetTopicFormat = GossipProtocolAndDigest + GossipAttesterSlashingMessage
// AggregateAndProofSubnetTopicFormat is the topic format for the aggregate and proof subnet.
AggregateAndProofSubnetTopicFormat = "/eth2/%x/beacon_aggregate_and_proof"
AggregateAndProofSubnetTopicFormat = GossipProtocolAndDigest + GossipAggregateAndProofMessage
// SyncContributionAndProofSubnetTopicFormat is the topic format for the sync aggregate and proof subnet.
SyncContributionAndProofSubnetTopicFormat = "/eth2/%x/sync_committee_contribution_and_proof"
SyncContributionAndProofSubnetTopicFormat = GossipProtocolAndDigest + GossipContributionAndProofMessage
)

View File

@@ -22,8 +22,8 @@ func (s *Service) decodePubsubMessage(msg *pubsub.Message) (proto.Message, error
if err != nil {
return nil, err
}
base, ok := p2p.GossipTopicMappings[topic]
if !ok {
base := p2p.GossipTopicMappings(topic, 0)
if base == nil {
return nil, p2p.ErrMessageNotMapped
}
m := proto.Clone(base)

View File

@@ -84,7 +84,7 @@ func (s *Service) updateMetrics() {
}
// We update all other gossip topics.
for topic := range p2p.GossipTopicMappings {
for _, topic := range p2p.AllTopics() {
// We already updated attestation subnet topics.
if strings.Contains(topic, "beacon_attestation") {
continue

View File

@@ -85,7 +85,7 @@ func (s *Service) registerSubscribers() {
// subscribe to a given topic with a given validator and subscription handler.
// The base protobuf message is used to initialize new messages for decoding.
func (s *Service) subscribe(topic string, validator pubsub.ValidatorEx, handle subHandler) *pubsub.Subscription {
base := p2p.GossipTopicMappings[topic]
base := p2p.GossipTopicMappings(topic, 0)
if base == nil {
panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topic))
}
@@ -198,7 +198,7 @@ func (s *Service) wrapAndReportValidation(topic string, v pubsub.ValidatorEx) (s
// subscribe to a static subnet with the given topic and index.A given validator and subscription handler is
// used to handle messages from the subnet. The base protobuf message is used to initialize new messages for decoding.
func (s *Service) subscribeStaticWithSubnets(topic string, validator pubsub.ValidatorEx, handle subHandler) {
base := p2p.GossipTopicMappings[topic]
base := p2p.GossipTopicMappings(topic, 0)
if base == nil {
panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topic))
}
@@ -248,7 +248,7 @@ func (s *Service) subscribeDynamicWithSubnets(
validate pubsub.ValidatorEx,
handle subHandler,
) {
base := p2p.GossipTopicMappings[topicFormat]
base := p2p.GossipTopicMappings(topicFormat, 0)
if base == nil {
log.Fatalf("%s is not mapped to any message in GossipTopicMappings", topicFormat)
}