Compare commits

...

7 Commits

Author SHA1 Message Date
nisdas
765122dc05 fix one more test 2021-08-04 22:27:26 +08:00
nisdas
a283fa58fb fixes 2021-08-04 21:25:23 +08:00
nisdas
12fe2d91ed fix tests 2021-08-04 19:47:35 +08:00
nisdas
d44905329c Merge branch 'hf1' of https://github.com/prysmaticlabs/geth-sharding into interopFixes 2021-08-04 16:22:21 +08:00
nisdas
c0310ad534 remove tracer 2021-08-04 15:25:59 +08:00
nisdas
e10ac0af02 Merge branch 'hf1' of https://github.com/prysmaticlabs/geth-sharding into interopFixes 2021-08-04 15:22:11 +08:00
nisdas
6e358da5ed checkpoint fixes from the last few days 2021-08-04 15:19:45 +08:00
22 changed files with 430 additions and 66 deletions

View File

@@ -75,6 +75,7 @@ go_test(
"//shared/testutil:go_default_library",
"//shared/testutil/assert:go_default_library",
"//shared/testutil/require:go_default_library",
"//shared/timeutils:go_default_library",
"//shared/trieutil:go_default_library",
"@com_github_google_gofuzz//:go_default_library",
"@com_github_prysmaticlabs_eth2_types//:go_default_library",

View File

@@ -2,6 +2,8 @@ package altair
import (
"context"
"fmt"
"time"
"github.com/pkg/errors"
types "github.com/prysmaticlabs/eth2-types"
@@ -170,7 +172,7 @@ func SyncSubCommitteePubkeys(syncCommittee *ethpb.SyncCommittee, subComIdx types
// modulo = max(1, SYNC_COMMITTEE_SIZE // SYNC_COMMITTEE_SUBNET_COUNT // TARGET_AGGREGATORS_PER_SYNC_SUBCOMMITTEE)
// return bytes_to_uint64(hash(signature)[0:8]) % modulo == 0
func IsSyncCommitteeAggregator(sig []byte) (bool, error) {
if len(sig) != params.BeaconConfig().BLSPubkeyLength {
if len(sig) != params.BeaconConfig().BLSSignatureLength {
return false, errors.New("incorrect sig length")
}
@@ -179,3 +181,41 @@ func IsSyncCommitteeAggregator(sig []byte) (bool, error) {
hashedSig := hashutil.Hash(sig)
return bytesutil.FromBytes8(hashedSig[:8])%modulo == 0, nil
}
// Validate Sync Message to ensure that the provided slot is valid.
func ValidateSyncMessageTime(slot types.Slot, genesisTime time.Time, clockDisparity time.Duration) error {
if err := helpers.ValidateSlotClock(slot, uint64(genesisTime.Unix())); err != nil {
return err
}
messageTime, err := helpers.SlotToTime(uint64(genesisTime.Unix()), slot)
if err != nil {
return err
}
currentSlot := helpers.SlotsSince(genesisTime)
slotStartTime, err := helpers.SlotToTime(uint64(genesisTime.Unix()), currentSlot)
if err != nil {
return err
}
lowestSlotBound := slotStartTime.Add(-clockDisparity)
currentLowerBound := time.Now().Add(-clockDisparity)
// In the event the Slot's start time, is before the
// current allowable bound, we set the slot's start
// time as the bound.
if slotStartTime.Before(currentLowerBound) {
lowestSlotBound = slotStartTime
}
lowerBound := lowestSlotBound
upperBound := time.Now().Add(clockDisparity)
// Verify sync message slot is within the time range.
if messageTime.Before(lowerBound) || messageTime.After(upperBound) {
return fmt.Errorf(
"sync message slot %d not within allowable range of %d to %d (current slot)",
slot,
lowerBound.Unix(),
upperBound.Unix(),
)
}
return nil
}

View File

@@ -3,6 +3,7 @@ package altair_test
import (
"context"
"testing"
"time"
types "github.com/prysmaticlabs/eth2-types"
"github.com/prysmaticlabs/prysm/beacon-chain/core/altair"
@@ -11,7 +12,9 @@ import (
ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/bls"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
"github.com/prysmaticlabs/prysm/shared/timeutils"
)
func TestSyncCommitteeIndices_CanGet(t *testing.T) {
@@ -278,6 +281,107 @@ func TestSyncSubCommitteePubkeys_CanGet(t *testing.T) {
}
func Test_ValidateSyncMessageTime(t *testing.T) {
if params.BeaconNetworkConfig().MaximumGossipClockDisparity < 200*time.Millisecond {
t.Fatal("This test expects the maximum clock disparity to be at least 200ms")
}
type args struct {
syncMessageSlot types.Slot
genesisTime time.Time
}
tests := []struct {
name string
args args
wantedErr string
}{
{
name: "sync_message.slot == current_slot",
args: args{
syncMessageSlot: 15,
genesisTime: timeutils.Now().Add(-15 * time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second),
},
},
{
name: "sync_message.slot == current_slot, received in middle of slot",
args: args{
syncMessageSlot: 15,
genesisTime: timeutils.Now().Add(
-15 * time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second,
).Add(-(time.Duration(params.BeaconConfig().SecondsPerSlot/2) * time.Second)),
},
},
{
name: "sync_message.slot == current_slot, received 200ms early",
args: args{
syncMessageSlot: 16,
genesisTime: timeutils.Now().Add(
-16 * time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second,
).Add(-200 * time.Millisecond),
},
},
{
name: "sync_message.slot > current_slot",
args: args{
syncMessageSlot: 16,
genesisTime: timeutils.Now().Add(-(15 * time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second)),
},
wantedErr: "sync message slot 16 not within allowable range of",
},
{
name: "sync_message.slot == current_slot+CLOCK_DISPARITY",
args: args{
syncMessageSlot: 100,
genesisTime: timeutils.Now().Add(-(100*time.Duration(params.BeaconConfig().SecondsPerSlot)*time.Second - params.BeaconNetworkConfig().MaximumGossipClockDisparity)),
},
wantedErr: "",
},
{
name: "sync_message.slot == current_slot+CLOCK_DISPARITY+200ms",
args: args{
syncMessageSlot: 100,
genesisTime: timeutils.Now().Add(-(100*time.Duration(params.BeaconConfig().SecondsPerSlot)*time.Second - params.BeaconNetworkConfig().MaximumGossipClockDisparity - 200*time.Millisecond)),
},
wantedErr: "sync message slot 100 not within allowable range of",
},
{
name: "sync_message.slot == current_slot-CLOCK_DISPARITY",
args: args{
syncMessageSlot: 100,
genesisTime: timeutils.Now().Add(-(100*time.Duration(params.BeaconConfig().SecondsPerSlot)*time.Second + params.BeaconNetworkConfig().MaximumGossipClockDisparity)),
},
wantedErr: "",
},
{
name: "sync_message.slot > current_slot+CLOCK_DISPARITY",
args: args{
syncMessageSlot: 101,
genesisTime: timeutils.Now().Add(-(100*time.Duration(params.BeaconConfig().SecondsPerSlot)*time.Second + params.BeaconNetworkConfig().MaximumGossipClockDisparity)),
},
wantedErr: "sync message slot 101 not within allowable range of",
},
{
name: "sync_message.slot is well beyond current slot",
args: args{
syncMessageSlot: 1 << 32,
genesisTime: timeutils.Now().Add(-15 * time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second),
},
wantedErr: "which exceeds max allowed value relative to the local clock",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := altair.ValidateSyncMessageTime(tt.args.syncMessageSlot, tt.args.genesisTime,
params.BeaconNetworkConfig().MaximumGossipClockDisparity)
if tt.wantedErr != "" {
assert.ErrorContains(t, tt.wantedErr, err)
} else {
assert.NoError(t, err)
}
})
}
}
func getState(t *testing.T, count uint64) *stateAltair.BeaconState {
validators := make([]*ethpb.Validator, count)
for i := 0; i < len(validators); i++ {

View File

@@ -40,6 +40,7 @@ go_library(
],
deps = [
"//beacon-chain/cache:go_default_library",
"//beacon-chain/core/altair:go_default_library",
"//beacon-chain/core/feed:go_default_library",
"//beacon-chain/core/feed/state:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
@@ -73,6 +74,7 @@ go_library(
"@com_github_ferranbt_fastssz//:go_default_library",
"@com_github_ipfs_go_ipfs_addr//:go_default_library",
"@com_github_kevinms_leakybucket_go//:go_default_library",
"@com_github_kr_pretty//:go_default_library",
"@com_github_libp2p_go_libp2p//:go_default_library",
"@com_github_libp2p_go_libp2p//config:go_default_library",
"@com_github_libp2p_go_libp2p//p2p/protocol/identify:go_default_library",

View File

@@ -9,6 +9,8 @@ import (
ssz "github.com/ferranbt/fastssz"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/beacon-chain/core/altair"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
eth "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/hashutil"
@@ -126,6 +128,13 @@ func (s *Service) broadcastAttestation(ctx context.Context, subnet uint64, att *
traceutil.AnnotateError(span, err)
}
}
// In the event our attestation is outdated and beyond the
// acceptable threshold, we exit early and do not broadcast it.
currSlot := helpers.CurrentSlot(uint64(s.genesisTime.Unix()))
if att.Data.Slot+params.BeaconConfig().SlotsPerEpoch < currSlot {
log.Warnf("Attestation is too old to broadcast, discarding it. Current Slot: %d , Attestation Slot: %d", currSlot, att.Data.Slot)
return
}
if err := s.broadcastObject(ctx, att, attestationToTopic(subnet, forkDigest)); err != nil {
log.WithError(err).Error("Failed to broadcast attestation")
@@ -138,15 +147,17 @@ func (s *Service) broadcastSyncCommittee(ctx context.Context, subnet uint64, sMs
defer span.End()
ctx = trace.NewContext(context.Background(), span) // clear parent context / deadline.
oneEpoch := time.Duration(1*params.BeaconConfig().SlotsPerEpoch.Mul(params.BeaconConfig().SecondsPerSlot)) * time.Second
ctx, cancel := context.WithTimeout(ctx, oneEpoch)
oneSlot := time.Duration(1*params.BeaconConfig().SecondsPerSlot) * time.Second
ctx, cancel := context.WithTimeout(ctx, oneSlot)
defer cancel()
// TODO: Change to sync committee subnet locker rather than sharing the same one with attestations.
// Ensure we have peers with this subnet.
s.subnetLocker(subnet).RLock()
// This adds in a special value to the subnet
// to ensure that we can re-use the same subnet locker.
wrappedSubIdx := subnet + syncLockerVal
s.subnetLocker(wrappedSubIdx).RLock()
hasPeer := s.hasPeerWithSubnet(syncCommitteeToTopic(subnet, forkDigest))
s.subnetLocker(subnet).RUnlock()
s.subnetLocker(wrappedSubIdx).RUnlock()
span.AddAttributes(
trace.BoolAttribute("hasPeer", hasPeer),
@@ -157,8 +168,8 @@ func (s *Service) broadcastSyncCommittee(ctx context.Context, subnet uint64, sMs
if !hasPeer {
syncCommitteeBroadcastAttempts.Inc()
if err := func() error {
s.subnetLocker(subnet).Lock()
defer s.subnetLocker(subnet).Unlock()
s.subnetLocker(wrappedSubIdx).Lock()
defer s.subnetLocker(wrappedSubIdx).Unlock()
ok, err := s.FindPeersWithSubnet(ctx, syncCommitteeToTopic(subnet, forkDigest), subnet, 1)
if err != nil {
return err
@@ -173,6 +184,12 @@ func (s *Service) broadcastSyncCommittee(ctx context.Context, subnet uint64, sMs
traceutil.AnnotateError(span, err)
}
}
// In the event our sync message is outdated and beyond the
// acceptable threshold, we exit early and do not broadcast it.
if err := altair.ValidateSyncMessageTime(sMsg.Slot, s.genesisTime, params.BeaconNetworkConfig().MaximumGossipClockDisparity); err != nil {
log.Warnf("Sync Committee Message is too old to broadcast, discarding it. %v", err)
return
}
if err := s.broadcastObject(ctx, sMsg, syncCommitteeToTopic(subnet, forkDigest)); err != nil {
log.WithError(err).Error("Failed to broadcast sync committee message")

View File

@@ -7,14 +7,19 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
pubsubpb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/encoder"
"github.com/prysmaticlabs/prysm/shared/p2putils"
"github.com/prysmaticlabs/prysm/shared/params"
)
var _ pubsub.SubscriptionFilter = (*Service)(nil)
const pubsubSubscriptionRequestLimit = 100
// It is set at this limit to handle the possibility
// of double topic subscriptions at fork boundaries.
// -> 64 Attestation Subnets * 2.
// -> 4 Sync Committee Subnets * 2.
// -> Block,Aggregate,ProposerSlashing,AttesterSlashing,Exits,SyncContribution * 2.
const pubsubSubscriptionRequestLimit = 200
// CanSubscribe returns true if the topic is of interest and we could subscribe to it.
func (s *Service) CanSubscribe(topic string) bool {
@@ -37,22 +42,12 @@ func (s *Service) CanSubscribe(topic string) bool {
log.WithError(err).Error("Could not determine fork digest")
return false
}
isForkNextEpoch, err := p2putils.IsForkNextEpoch(s.genesisTime, s.genesisValidatorsRoot)
digest, err := p2putils.ForkDigestFromEpoch(params.BeaconConfig().AltairForkEpoch, s.genesisValidatorsRoot)
if err != nil {
log.WithError(err).Error("Could not determine next fork epoch")
log.WithError(err).Error("Could not determine next fork digest")
return false
}
if isForkNextEpoch {
currEpoch := helpers.SlotToEpoch(helpers.CurrentSlot(uint64(s.genesisTime.Unix())))
digest, err := p2putils.ForkDigestFromEpoch(currEpoch+1, s.genesisValidatorsRoot)
if err != nil {
log.WithError(err).Error("Could not determine next fork digest")
return false
}
if parts[2] != fmt.Sprintf("%x", fd) && parts[2] != fmt.Sprintf("%x", digest) {
return false
}
} else if parts[2] != fmt.Sprintf("%x", fd) {
if parts[2] != fmt.Sprintf("%x", fd) && parts[2] != fmt.Sprintf("%x", digest) {
return false
}
if parts[4] != encoder.ProtocolSuffixSSZSnappy {

View File

@@ -4,11 +4,13 @@ import (
"context"
ssz "github.com/ferranbt/fastssz"
"github.com/kr/pretty"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/shared/traceutil"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)
@@ -25,6 +27,11 @@ func (s *Service) Send(ctx context.Context, message interface{}, baseTopic strin
topic := baseTopic + s.Encoding().ProtocolSuffix()
span.AddAttributes(trace.StringAttribute("topic", topic))
log.WithFields(logrus.Fields{
"topic": topic,
"request": pretty.Sprint(message),
}).Tracef("Sending RPC request to peer %s", pid.String())
// Apply max dial timeout when opening a new stream.
ctx, cancel := context.WithTimeout(ctx, maxDialTimeout)
defer cancel()

View File

@@ -129,7 +129,6 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) {
s.host = h
s.host.RemoveStreamHandler(identify.IDDelta)
// Gossipsub registration is done before we add in any new peers
// due to libp2p's gossipsub implementation not taking into
// account previously added peers when creating the gossipsub

View File

@@ -22,6 +22,11 @@ var syncCommsSubnetCount = params.BeaconConfig().SyncCommitteeSubnetCount
var attSubnetEnrKey = params.BeaconNetworkConfig().AttSubnetKey
var syncCommsSubnetEnrKey = params.BeaconNetworkConfig().SyncCommsSubnetKey
// The value used with the subnet, inorder
// to create an appropriate key tor retrieve
// the relevant lock.
const syncLockerVal = 100
// FindPeersWithSubnet performs a network search for peers
// subscribed to a particular subnet. Then we try to connect
// with those peers. This method will block until the required amount of
@@ -237,6 +242,13 @@ func syncBitvector(record *enr.Record) (bitfield.Bitvector4, error) {
return bitV, nil
}
// The subnet locker is a map which keeps track of all
// mutexes stored per subnet. This locker is re-used
// between both the attestation and sync subnets. In
// order to differentiate between attestation and sync
// subnets. Sync subnets are stored by (subnet+syncLockerVal). This
// is to prevent conflicts while allowing both subnets
// to use a single locker.
func (s *Service) subnetLocker(i uint64) *sync.RWMutex {
s.subnetsLockLock.Lock()
defer s.subnetsLockLock.Unlock()

View File

@@ -107,6 +107,7 @@ go_library(
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
"@com_github_prysmaticlabs_eth2_types//:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_trailofbits_go_mutexasserts//:go_default_library",
"@io_opencensus_go//trace:go_default_library",

View File

@@ -97,10 +97,7 @@ func (s *Service) checkForPreviousEpochFork(currEpoch types.Epoch) error {
continue
}
if retDigest == prevDigest {
if err := s.cfg.P2P.PubSub().UnregisterTopicValidator(t); err != nil {
log.WithError(err).Error("Could not unregister topic validator")
}
s.subHandler.removeTopic(t)
s.unSubscribeFromTopic(t)
}
}
}

View File

@@ -7,6 +7,8 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/cmd/beacon-chain/flags"
pb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
@@ -20,6 +22,12 @@ var (
Help: "The number of peers subscribed to a given topic.",
}, []string{"topic"},
)
subscribedTopicPeerCount = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "p2p_subscribed_topic_peer_count",
Help: "The number of peers subscribed to topics that a host node is also subscribed to.",
}, []string{"topic"},
)
messageReceivedCounter = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "p2p_message_received_total",
@@ -34,6 +42,13 @@ var (
},
[]string{"topic"},
)
messageIgnoredValidationCounter = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "p2p_message_ignored_validation_total",
Help: "Count of messages that were ignored in validation.",
},
[]string{"topic"},
)
messageFailedProcessingCounter = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "p2p_message_failed_processing_total",
@@ -69,24 +84,35 @@ func (s *Service) updateMetrics() {
log.WithError(err).Debugf("Could not compute fork digest")
}
indices := s.aggregatorSubnetIndices(s.cfg.Chain.CurrentSlot())
syncIndices := cache.SyncSubnetIDs.GetAllSubnets(helpers.SlotToEpoch(s.cfg.Chain.CurrentSlot()))
attTopic := p2p.GossipTypeMapping[reflect.TypeOf(&pb.Attestation{})]
syncTopic := p2p.GossipTypeMapping[reflect.TypeOf(&pb.SyncCommitteeMessage{})]
attTopic += s.cfg.P2P.Encoding().ProtocolSuffix()
syncTopic += s.cfg.P2P.Encoding().ProtocolSuffix()
if flags.Get().SubscribeToAllSubnets {
for i := uint64(0); i < params.BeaconNetworkConfig().AttestationSubnetCount; i++ {
formattedTopic := fmt.Sprintf(attTopic, digest, i)
topicPeerCount.WithLabelValues(formattedTopic).Set(float64(len(s.cfg.P2P.PubSub().ListPeers(formattedTopic))))
}
for i := uint64(0); i < params.BeaconConfig().SyncCommitteeSubnetCount; i++ {
formattedTopic := fmt.Sprintf(syncTopic, digest, i)
topicPeerCount.WithLabelValues(formattedTopic).Set(float64(len(s.cfg.P2P.PubSub().ListPeers(formattedTopic))))
}
} else {
for _, committeeIdx := range indices {
formattedTopic := fmt.Sprintf(attTopic, digest, committeeIdx)
topicPeerCount.WithLabelValues(formattedTopic).Set(float64(len(s.cfg.P2P.PubSub().ListPeers(formattedTopic))))
}
for _, committeeIdx := range syncIndices {
formattedTopic := fmt.Sprintf(syncTopic, digest, committeeIdx)
topicPeerCount.WithLabelValues(formattedTopic).Set(float64(len(s.cfg.P2P.PubSub().ListPeers(formattedTopic))))
}
}
// We update all other gossip topics.
for _, topic := range p2p.AllTopics() {
// We already updated attestation subnet topics.
if strings.Contains(topic, "beacon_attestation") {
if strings.Contains(topic, p2p.GossipAttestationMessage) || strings.Contains(topic, p2p.GossipSyncCommitteeMessage) {
continue
}
topic += s.cfg.P2P.Encoding().ProtocolSuffix()
@@ -97,4 +123,8 @@ func (s *Service) updateMetrics() {
formattedTopic := fmt.Sprintf(topic, digest)
topicPeerCount.WithLabelValues(formattedTopic).Set(float64(len(s.cfg.P2P.PubSub().ListPeers(formattedTopic))))
}
for _, topic := range s.cfg.P2P.PubSub().GetTopics() {
subscribedTopicPeerCount.WithLabelValues(topic).Set(float64(len(s.cfg.P2P.PubSub().ListPeers(topic))))
}
}

View File

@@ -6,11 +6,14 @@ import (
libp2pcore "github.com/libp2p/go-libp2p-core"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/pkg/errors"
"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/types"
pb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/metadata"
"github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/wrapper"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/p2putils"
"github.com/prysmaticlabs/prysm/shared/params"
@@ -36,14 +39,8 @@ func (s *Service) metaDataHandler(_ context.Context, _ interface{}, stream libp2
}
return nilErr
}
topicVersion := ""
switch s.cfg.P2P.Metadata().Version() {
case version.Phase0:
topicVersion = p2p.SchemaVersionV1
case version.Altair:
topicVersion = p2p.SchemaVersionV2
}
if err := validateVersion(topicVersion, stream); err != nil {
_, _, streamVersion, err := p2p.TopicDeconstructor(string(stream.Protocol()))
if err != nil {
resp, genErr := s.generateErrorResponse(responseCodeServerError, types.ErrGeneric.Error())
if genErr != nil {
log.WithError(genErr).Debug("Could not generate a response error")
@@ -52,10 +49,34 @@ func (s *Service) metaDataHandler(_ context.Context, _ interface{}, stream libp2
}
return err
}
currMd := s.cfg.P2P.Metadata()
switch streamVersion {
case p2p.SchemaVersionV1:
// We have a v1 metadata object saved locally, so we
// convert it back to a v0 metadata object.
if currMd.Version() != version.Phase0 {
currMd = wrapper.WrappedMetadataV0(
&pb.MetaDataV0{
Attnets: currMd.AttnetsBitfield(),
SeqNumber: currMd.SequenceNumber(),
})
}
case p2p.SchemaVersionV2:
// We have a v0 metadata object saved locally, so we
// convert it to a v1 metadata object.
if currMd.Version() != version.Altair {
currMd = wrapper.WrappedMetadataV1(
&pb.MetaDataV1{
Attnets: currMd.AttnetsBitfield(),
SeqNumber: currMd.SequenceNumber(),
Syncnets: bitfield.Bitvector4{byte(0x00)},
})
}
}
if _, err := stream.Write([]byte{responseCodeSuccess}); err != nil {
return err
}
_, err := s.cfg.P2P.Encoding().EncodeWithMaxLength(stream, s.cfg.P2P.Metadata())
_, err = s.cfg.P2P.Encoding().EncodeWithMaxLength(stream, currMd)
if err != nil {
return err
}

View File

@@ -2,7 +2,6 @@ package sync
import (
"context"
"fmt"
"reflect"
"sync"
"testing"
@@ -17,7 +16,6 @@ import (
db "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
p2ptest "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/types"
pb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/metadata"
"github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/wrapper"
@@ -189,11 +187,11 @@ func TestMetadataRPCHandler_SendsMetadataAltair(t *testing.T) {
p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) {
defer wg.Done()
err := r2.metaDataHandler(context.Background(), new(interface{}), stream)
assert.ErrorContains(t, fmt.Sprintf("stream version of %s doesn't match provided version %s", p2p.SchemaVersionV2, p2p.SchemaVersionV1), err)
assert.NoError(t, err)
})
_, err := r.sendMetaDataRequest(context.Background(), p2.BHost.ID())
assert.ErrorContains(t, types.ErrGeneric.Error(), err)
assert.NoError(t, err)
if testutil.WaitTimeout(&wg, 1*time.Second) {
t.Fatal("Did not receive stream within 1 sec")

View File

@@ -178,10 +178,7 @@ func (s *Service) Stop() error {
}
// Deregister Topic Subscribers.
for _, t := range s.cfg.P2P.PubSub().GetTopics() {
if err := s.cfg.P2P.PubSub().UnregisterTopicValidator(t); err != nil {
log.Errorf("Could not successfully unregister for topic %s: %v", t, err)
}
s.subHandler.removeTopic(t)
s.unSubscribeFromTopic(t)
}
defer s.cancel()
return nil

View File

@@ -213,6 +213,7 @@ func (s *Service) subscribeWithBase(topic string, validator pubsub.ValidatorEx,
}
go messageLoop()
log.WithField("topic", topic).Info("Subscribed to topic")
return sub
}
@@ -231,13 +232,30 @@ func (s *Service) wrapAndReportValidation(topic string, v pubsub.ValidatorEx) (s
}
// Ignore any messages received before chainstart.
if s.chainStarted.IsNotSet() {
messageFailedValidationCounter.WithLabelValues(topic).Inc()
messageIgnoredValidationCounter.WithLabelValues(topic).Inc()
return pubsub.ValidationIgnore
}
retDigest, err := p2p.ExtractGossipDigest(topic)
if err != nil {
log.WithField("topic", topic).Errorf("Invalid topic format of pubsub topic: %v", err)
return pubsub.ValidationIgnore
}
currDigest, err := s.currentForkDigest()
if err != nil {
log.WithField("topic", topic).Errorf("Unable to retrieve fork data: %v", err)
return pubsub.ValidationIgnore
}
if currDigest != retDigest {
log.WithField("topic", topic).Warnf("Received message from outdated fork digest %#x", retDigest)
return pubsub.ValidationIgnore
}
b := v(ctx, pid, msg)
if b == pubsub.ValidationReject {
messageFailedValidationCounter.WithLabelValues(topic).Inc()
}
if b == pubsub.ValidationIgnore {
messageIgnoredValidationCounter.WithLabelValues(topic).Inc()
}
return b
}
}
@@ -280,10 +298,7 @@ func (s *Service) subscribeStaticWithSubnets(topic string, validator pubsub.Vali
// Unsubscribes from all our current subnets.
for i := uint64(0); i < params.BeaconNetworkConfig().AttestationSubnetCount; i++ {
fullTopic := fmt.Sprintf(topic, digest, i) + s.cfg.P2P.Encoding().ProtocolSuffix()
if err := s.cfg.P2P.PubSub().UnregisterTopicValidator(fullTopic); err != nil {
log.WithError(err).Error("Could not unregister topic validator")
}
s.subHandler.removeTopic(fullTopic)
s.unSubscribeFromTopic(fullTopic)
}
ticker.Done()
return
@@ -386,10 +401,7 @@ func (s *Service) reValidateSubscriptions(subscriptions map[uint64]*pubsub.Subsc
if !wanted && v != nil {
v.Cancel()
fullTopic := fmt.Sprintf(topicFormat, digest, k) + s.cfg.P2P.Encoding().ProtocolSuffix()
if err := s.cfg.P2P.PubSub().UnregisterTopicValidator(fullTopic); err != nil {
log.WithError(err).Error("Could not unregister topic validator")
}
s.subHandler.removeTopic(fullTopic)
s.unSubscribeFromTopic(fullTopic)
delete(subscriptions, k)
}
}
@@ -485,10 +497,7 @@ func (s *Service) subscribeStaticWithSyncSubnets(topic string, validator pubsub.
// Unsubscribes from all our current subnets.
for i := uint64(0); i < params.BeaconConfig().SyncCommitteeSubnetCount; i++ {
fullTopic := fmt.Sprintf(topic, digest, i) + s.cfg.P2P.Encoding().ProtocolSuffix()
if err := s.cfg.P2P.PubSub().UnregisterTopicValidator(fullTopic); err != nil {
log.WithError(err).Error("Could not unregister topic validator")
}
s.subHandler.removeTopic(fullTopic)
s.unSubscribeFromTopic(fullTopic)
}
ticker.Done()
return
@@ -588,6 +597,21 @@ func (s *Service) lookupAttesterSubnets(digest [4]byte, idx uint64) {
}
}
func (s *Service) unSubscribeFromTopic(topic string) {
log.WithField("topic", topic).Debug("Unsubscribing from topic")
if err := s.cfg.P2P.PubSub().UnregisterTopicValidator(topic); err != nil {
log.WithError(err).Error("Could not unregister topic validator")
}
sub := s.subHandler.subForTopic(topic)
if sub != nil {
sub.Cancel()
}
s.subHandler.removeTopic(topic)
if err := s.cfg.P2P.LeaveTopic(topic); err != nil {
log.WithError(err).Error("Unable to leave topic")
}
}
// find if we have peers who are subscribed to the same subnet
func (s *Service) validPeersExist(subnetTopic string) bool {
numOfPeers := s.cfg.P2P.PubSub().ListPeers(subnetTopic + s.cfg.P2P.Encoding().ProtocolSuffix())

View File

@@ -18,11 +18,13 @@ import (
db "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/operations/slashings"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/encoder"
p2ptest "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
mockSync "github.com/prysmaticlabs/prysm/beacon-chain/sync/initial-sync/testing"
pb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/abool"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/p2putils"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/testutil"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
@@ -280,6 +282,13 @@ func TestStaticSubnets(t *testing.T) {
}
func Test_wrapAndReportValidation(t *testing.T) {
mChain := &mockChain.ChainService{
Genesis: time.Now(),
ValidatorsRoot: [32]byte{0x01},
}
fd, err := p2putils.CreateForkDigest(mChain.GenesisTime(), mChain.ValidatorsRoot[:])
assert.NoError(t, err)
mockTopic := fmt.Sprintf(p2p.BlockSubnetTopicFormat, fd) + encoder.SszNetworkEncoder{}.ProtocolSuffix()
type args struct {
topic string
v pubsub.ValidatorEx
@@ -333,7 +342,7 @@ func Test_wrapAndReportValidation(t *testing.T) {
{
name: "validator OK",
args: args{
topic: "foo",
topic: mockTopic,
v: func(ctx context.Context, id peer.ID, message *pubsub.Message) pubsub.ValidationResult {
return pubsub.ValidationAccept
},
@@ -341,7 +350,7 @@ func Test_wrapAndReportValidation(t *testing.T) {
msg: &pubsub.Message{
Message: &pubsubpb.Message{
Topic: func() *string {
s := "foo"
s := mockTopic
return &s
}(),
},
@@ -371,6 +380,9 @@ func Test_wrapAndReportValidation(t *testing.T) {
chainStarted.SetTo(tt.args.chainstarted)
s := &Service{
chainStarted: chainStarted,
cfg: &Config{
Chain: mChain,
},
}
_, v := s.wrapAndReportValidation(tt.args.topic, tt.args.v)
got := v(context.Background(), tt.args.pid, tt.args.msg)

View File

@@ -82,3 +82,9 @@ func (s *subTopicHandler) allTopics() []string {
}
return topics
}
func (s *subTopicHandler) subForTopic(topic string) *pubsub.Subscription {
s.RLock()
defer s.RUnlock()
return s.subTopics[topic]
}

View File

@@ -141,7 +141,7 @@ func (s *Service) validateBeaconBlockPubSub(ctx context.Context, pid peer.ID, ms
}
// Handle block when the parent is unknown.
if !s.cfg.DB.HasBlock(ctx, bytesutil.ToBytes32(blk.Block().ParentRoot())) {
if !s.cfg.DB.HasBlock(ctx, bytesutil.ToBytes32(blk.Block().ParentRoot())) && !s.cfg.Chain.HasInitSyncBlock(bytesutil.ToBytes32(blk.Block().ParentRoot())) {
s.pendingQueueLock.Lock()
if err := s.insertBlockToPendingQueue(blk.Block().Slot(), blk, blockRoot); err != nil {
s.pendingQueueLock.Unlock()

View File

@@ -9,6 +9,7 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
types "github.com/prysmaticlabs/eth2-types"
"github.com/prysmaticlabs/prysm/beacon-chain/core/altair"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
p2ptypes "github.com/prysmaticlabs/prysm/beacon-chain/p2p/types"
@@ -54,7 +55,7 @@ func (s *Service) validateSyncCommitteeMessage(ctx context.Context, pid peer.ID,
}
// The message's `slot` is for the current slot (with a MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance)
if err := helpers.VerifySlotTime(uint64(s.cfg.Chain.GenesisTime().Unix()), m.Slot, params.BeaconNetworkConfig().MaximumGossipClockDisparity); err != nil {
if err := altair.ValidateSyncMessageTime(m.Slot, s.cfg.Chain.GenesisTime(), params.BeaconNetworkConfig().MaximumGossipClockDisparity); err != nil {
traceutil.AnnotateError(span, err)
return pubsub.ValidationIgnore
}

View File

@@ -533,7 +533,7 @@ func TestService_ValidateSyncContributionAndProof(t *testing.T) {
want: pubsub.ValidationReject,
},
{
name: "Valid Signed Sync Contribution And Proof",
name: "Invalid Signed Sync Contribution And Proof - Zero Bits Set",
svc: NewService(context.Background(), &Config{
P2P: mockp2p.NewTestP2P(t),
InitialSync: &mockSync.Sync{IsSyncing: false},
@@ -596,6 +596,102 @@ func TestService_ValidateSyncContributionAndProof(t *testing.T) {
assert.NoError(t, s.initCaches())
return s
},
args: args{
ctx: context.Background(),
pid: "random",
topic: defaultTopic,
msg: &ethpb.SignedContributionAndProof{
Message: &ethpb.ContributionAndProof{
AggregatorIndex: 1,
Contribution: &ethpb.SyncCommitteeContribution{
Slot: 1,
SubcommitteeIndex: 1,
BlockRoot: params.BeaconConfig().ZeroHash[:],
AggregationBits: bitfield.NewBitvector128(),
Signature: emptySig[:],
},
SelectionProof: emptySig[:],
},
Signature: emptySig[:],
}},
want: pubsub.ValidationReject,
},
{
name: "Valid Signed Sync Contribution And Proof - Single Bit Set",
svc: NewService(context.Background(), &Config{
P2P: mockp2p.NewTestP2P(t),
InitialSync: &mockSync.Sync{IsSyncing: false},
Chain: chainService,
StateNotifier: chainService.StateNotifier(),
OperationNotifier: chainService.OperationNotifier(),
}),
setupSvc: func(s *Service, msg *ethpb.SignedContributionAndProof) *Service {
s.cfg.StateGen = stategen.New(db)
msg.Message.Contribution.BlockRoot = headRoot[:]
s.cfg.DB = db
hState, err := db.State(context.Background(), headRoot)
assert.NoError(t, err)
sc, err := hState.CurrentSyncCommittee()
assert.NoError(t, err)
cd, err := helpers.Domain(hState.Fork(), helpers.SlotToEpoch(helpers.PrevSlot(hState.Slot())), params.BeaconConfig().DomainContributionAndProof, hState.GenesisValidatorRoot())
assert.NoError(t, err)
d, err := helpers.Domain(hState.Fork(), helpers.SlotToEpoch(hState.Slot()), params.BeaconConfig().DomainSyncCommittee, hState.GenesisValidatorRoot())
assert.NoError(t, err)
var pubkeys [][]byte
for i := uint64(0); i < params.BeaconConfig().SyncCommitteeSubnetCount; i++ {
coms, err := altair.SyncSubCommitteePubkeys(sc, types.CommitteeIndex(i))
pubkeys = coms
assert.NoError(t, err)
for _, p := range coms {
idx, ok := hState.ValidatorIndexByPubkey(bytesutil.ToBytes48(p))
assert.Equal(t, true, ok)
rt, err := syncSelectionProofSigningRoot(hState, helpers.PrevSlot(hState.Slot()), types.CommitteeIndex(i))
assert.NoError(t, err)
sig := keys[idx].Sign(rt[:])
isAggregator, err := altair.IsSyncCommitteeAggregator(sig.Marshal())
require.NoError(t, err)
if isAggregator {
msg.Message.AggregatorIndex = idx
msg.Message.SelectionProof = sig.Marshal()
msg.Message.Contribution.Slot = helpers.PrevSlot(hState.Slot())
msg.Message.Contribution.SubcommitteeIndex = i
msg.Message.Contribution.BlockRoot = headRoot[:]
msg.Message.Contribution.AggregationBits = bitfield.NewBitvector128()
// Only Sign for 1 validator.
rawBytes := p2ptypes.SSZBytes(headRoot[:])
sigRoot, err := helpers.ComputeSigningRoot(&rawBytes, d)
assert.NoError(t, err)
valIdx, ok := hState.ValidatorIndexByPubkey(bytesutil.ToBytes48(coms[0]))
assert.Equal(t, true, ok)
sig = keys[valIdx].Sign(sigRoot[:])
msg.Message.Contribution.AggregationBits.SetBitAt(uint64(0), true)
msg.Message.Contribution.Signature = sig.Marshal()
sigRoot, err = helpers.ComputeSigningRoot(msg.Message, cd)
assert.NoError(t, err)
contrSig := keys[idx].Sign(sigRoot[:])
msg.Signature = contrSig.Marshal()
break
}
}
}
pd, err := helpers.Domain(hState.Fork(), helpers.SlotToEpoch(helpers.PrevSlot(hState.Slot())), params.BeaconConfig().DomainSyncCommitteeSelectionProof, hState.GenesisValidatorRoot())
require.NoError(t, err)
subCommitteeSize := params.BeaconConfig().SyncCommitteeSize / params.BeaconConfig().SyncCommitteeSubnetCount
s.cfg.Chain = &mockChain.ChainService{
ValidatorsRoot: [32]byte{'A'},
Genesis: time.Now().Add(-time.Second * time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Duration(hState.Slot()-1)),
CurrentSyncCommitteeIndices: []types.CommitteeIndex{types.CommitteeIndex(msg.Message.Contribution.SubcommitteeIndex * subCommitteeSize)},
PublicKey: bytesutil.ToBytes48(keys[msg.Message.AggregatorIndex].PublicKey().Marshal()),
SyncSelectionProofDomain: pd,
SyncContributionProofDomain: cd,
SyncCommitteeDomain: d,
SyncCommitteePubkeys: pubkeys,
}
assert.NoError(t, s.initCaches())
return s
},
args: args{
ctx: context.Background(),
pid: "random",

View File

@@ -349,6 +349,8 @@ func TestSubmitSignedContributionAndProof_CouldNotSubmitContribution(t *testing.
SignatureDomain: make([]byte, 32),
}, nil)
aggBits := bitfield.NewBitvector128()
aggBits.SetBitAt(0, true)
m.validatorClient.EXPECT().GetSyncCommitteeContribution(
gomock.Any(), // ctx
&ethpb.SyncCommitteeContributionRequest{
@@ -359,7 +361,7 @@ func TestSubmitSignedContributionAndProof_CouldNotSubmitContribution(t *testing.
).Return(&ethpb.SyncCommitteeContribution{
BlockRoot: make([]byte, 32),
Signature: make([]byte, 96),
AggregationBits: bitfield.NewBitvector128(),
AggregationBits: aggBits,
}, nil)
m.validatorClient.EXPECT().
@@ -425,6 +427,8 @@ func TestSubmitSignedContributionAndProof_Ok(t *testing.T) {
SignatureDomain: make([]byte, 32),
}, nil)
aggBits := bitfield.NewBitvector128()
aggBits.SetBitAt(0, true)
m.validatorClient.EXPECT().GetSyncCommitteeContribution(
gomock.Any(), // ctx
&ethpb.SyncCommitteeContributionRequest{
@@ -435,7 +439,7 @@ func TestSubmitSignedContributionAndProof_Ok(t *testing.T) {
).Return(&ethpb.SyncCommitteeContribution{
BlockRoot: make([]byte, 32),
Signature: make([]byte, 96),
AggregationBits: bitfield.NewBitvector128(),
AggregationBits: aggBits,
}, nil)
m.validatorClient.EXPECT().