mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-10 13:58:09 -05:00
Compare commits
7 Commits
v2.0.0-rc.
...
interopFix
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
765122dc05 | ||
|
|
a283fa58fb | ||
|
|
12fe2d91ed | ||
|
|
d44905329c | ||
|
|
c0310ad534 | ||
|
|
e10ac0af02 | ||
|
|
6e358da5ed |
@@ -75,6 +75,7 @@ go_test(
|
||||
"//shared/testutil:go_default_library",
|
||||
"//shared/testutil/assert:go_default_library",
|
||||
"//shared/testutil/require:go_default_library",
|
||||
"//shared/timeutils:go_default_library",
|
||||
"//shared/trieutil:go_default_library",
|
||||
"@com_github_google_gofuzz//:go_default_library",
|
||||
"@com_github_prysmaticlabs_eth2_types//:go_default_library",
|
||||
|
||||
@@ -2,6 +2,8 @@ package altair
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
types "github.com/prysmaticlabs/eth2-types"
|
||||
@@ -170,7 +172,7 @@ func SyncSubCommitteePubkeys(syncCommittee *ethpb.SyncCommittee, subComIdx types
|
||||
// modulo = max(1, SYNC_COMMITTEE_SIZE // SYNC_COMMITTEE_SUBNET_COUNT // TARGET_AGGREGATORS_PER_SYNC_SUBCOMMITTEE)
|
||||
// return bytes_to_uint64(hash(signature)[0:8]) % modulo == 0
|
||||
func IsSyncCommitteeAggregator(sig []byte) (bool, error) {
|
||||
if len(sig) != params.BeaconConfig().BLSPubkeyLength {
|
||||
if len(sig) != params.BeaconConfig().BLSSignatureLength {
|
||||
return false, errors.New("incorrect sig length")
|
||||
}
|
||||
|
||||
@@ -179,3 +181,41 @@ func IsSyncCommitteeAggregator(sig []byte) (bool, error) {
|
||||
hashedSig := hashutil.Hash(sig)
|
||||
return bytesutil.FromBytes8(hashedSig[:8])%modulo == 0, nil
|
||||
}
|
||||
|
||||
// Validate Sync Message to ensure that the provided slot is valid.
|
||||
func ValidateSyncMessageTime(slot types.Slot, genesisTime time.Time, clockDisparity time.Duration) error {
|
||||
if err := helpers.ValidateSlotClock(slot, uint64(genesisTime.Unix())); err != nil {
|
||||
return err
|
||||
}
|
||||
messageTime, err := helpers.SlotToTime(uint64(genesisTime.Unix()), slot)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
currentSlot := helpers.SlotsSince(genesisTime)
|
||||
slotStartTime, err := helpers.SlotToTime(uint64(genesisTime.Unix()), currentSlot)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
lowestSlotBound := slotStartTime.Add(-clockDisparity)
|
||||
currentLowerBound := time.Now().Add(-clockDisparity)
|
||||
// In the event the Slot's start time, is before the
|
||||
// current allowable bound, we set the slot's start
|
||||
// time as the bound.
|
||||
if slotStartTime.Before(currentLowerBound) {
|
||||
lowestSlotBound = slotStartTime
|
||||
}
|
||||
|
||||
lowerBound := lowestSlotBound
|
||||
upperBound := time.Now().Add(clockDisparity)
|
||||
// Verify sync message slot is within the time range.
|
||||
if messageTime.Before(lowerBound) || messageTime.After(upperBound) {
|
||||
return fmt.Errorf(
|
||||
"sync message slot %d not within allowable range of %d to %d (current slot)",
|
||||
slot,
|
||||
lowerBound.Unix(),
|
||||
upperBound.Unix(),
|
||||
)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package altair_test
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
types "github.com/prysmaticlabs/eth2-types"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/altair"
|
||||
@@ -11,7 +12,9 @@ import (
|
||||
ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/shared/bls"
|
||||
"github.com/prysmaticlabs/prysm/shared/params"
|
||||
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
|
||||
"github.com/prysmaticlabs/prysm/shared/testutil/require"
|
||||
"github.com/prysmaticlabs/prysm/shared/timeutils"
|
||||
)
|
||||
|
||||
func TestSyncCommitteeIndices_CanGet(t *testing.T) {
|
||||
@@ -278,6 +281,107 @@ func TestSyncSubCommitteePubkeys_CanGet(t *testing.T) {
|
||||
|
||||
}
|
||||
|
||||
func Test_ValidateSyncMessageTime(t *testing.T) {
|
||||
if params.BeaconNetworkConfig().MaximumGossipClockDisparity < 200*time.Millisecond {
|
||||
t.Fatal("This test expects the maximum clock disparity to be at least 200ms")
|
||||
}
|
||||
|
||||
type args struct {
|
||||
syncMessageSlot types.Slot
|
||||
genesisTime time.Time
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
wantedErr string
|
||||
}{
|
||||
{
|
||||
name: "sync_message.slot == current_slot",
|
||||
args: args{
|
||||
syncMessageSlot: 15,
|
||||
genesisTime: timeutils.Now().Add(-15 * time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "sync_message.slot == current_slot, received in middle of slot",
|
||||
args: args{
|
||||
syncMessageSlot: 15,
|
||||
genesisTime: timeutils.Now().Add(
|
||||
-15 * time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second,
|
||||
).Add(-(time.Duration(params.BeaconConfig().SecondsPerSlot/2) * time.Second)),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "sync_message.slot == current_slot, received 200ms early",
|
||||
args: args{
|
||||
syncMessageSlot: 16,
|
||||
genesisTime: timeutils.Now().Add(
|
||||
-16 * time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second,
|
||||
).Add(-200 * time.Millisecond),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "sync_message.slot > current_slot",
|
||||
args: args{
|
||||
syncMessageSlot: 16,
|
||||
genesisTime: timeutils.Now().Add(-(15 * time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second)),
|
||||
},
|
||||
wantedErr: "sync message slot 16 not within allowable range of",
|
||||
},
|
||||
{
|
||||
name: "sync_message.slot == current_slot+CLOCK_DISPARITY",
|
||||
args: args{
|
||||
syncMessageSlot: 100,
|
||||
genesisTime: timeutils.Now().Add(-(100*time.Duration(params.BeaconConfig().SecondsPerSlot)*time.Second - params.BeaconNetworkConfig().MaximumGossipClockDisparity)),
|
||||
},
|
||||
wantedErr: "",
|
||||
},
|
||||
{
|
||||
name: "sync_message.slot == current_slot+CLOCK_DISPARITY+200ms",
|
||||
args: args{
|
||||
syncMessageSlot: 100,
|
||||
genesisTime: timeutils.Now().Add(-(100*time.Duration(params.BeaconConfig().SecondsPerSlot)*time.Second - params.BeaconNetworkConfig().MaximumGossipClockDisparity - 200*time.Millisecond)),
|
||||
},
|
||||
wantedErr: "sync message slot 100 not within allowable range of",
|
||||
},
|
||||
{
|
||||
name: "sync_message.slot == current_slot-CLOCK_DISPARITY",
|
||||
args: args{
|
||||
syncMessageSlot: 100,
|
||||
genesisTime: timeutils.Now().Add(-(100*time.Duration(params.BeaconConfig().SecondsPerSlot)*time.Second + params.BeaconNetworkConfig().MaximumGossipClockDisparity)),
|
||||
},
|
||||
wantedErr: "",
|
||||
},
|
||||
{
|
||||
name: "sync_message.slot > current_slot+CLOCK_DISPARITY",
|
||||
args: args{
|
||||
syncMessageSlot: 101,
|
||||
genesisTime: timeutils.Now().Add(-(100*time.Duration(params.BeaconConfig().SecondsPerSlot)*time.Second + params.BeaconNetworkConfig().MaximumGossipClockDisparity)),
|
||||
},
|
||||
wantedErr: "sync message slot 101 not within allowable range of",
|
||||
},
|
||||
{
|
||||
name: "sync_message.slot is well beyond current slot",
|
||||
args: args{
|
||||
syncMessageSlot: 1 << 32,
|
||||
genesisTime: timeutils.Now().Add(-15 * time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second),
|
||||
},
|
||||
wantedErr: "which exceeds max allowed value relative to the local clock",
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
err := altair.ValidateSyncMessageTime(tt.args.syncMessageSlot, tt.args.genesisTime,
|
||||
params.BeaconNetworkConfig().MaximumGossipClockDisparity)
|
||||
if tt.wantedErr != "" {
|
||||
assert.ErrorContains(t, tt.wantedErr, err)
|
||||
} else {
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func getState(t *testing.T, count uint64) *stateAltair.BeaconState {
|
||||
validators := make([]*ethpb.Validator, count)
|
||||
for i := 0; i < len(validators); i++ {
|
||||
|
||||
@@ -40,6 +40,7 @@ go_library(
|
||||
],
|
||||
deps = [
|
||||
"//beacon-chain/cache:go_default_library",
|
||||
"//beacon-chain/core/altair:go_default_library",
|
||||
"//beacon-chain/core/feed:go_default_library",
|
||||
"//beacon-chain/core/feed/state:go_default_library",
|
||||
"//beacon-chain/core/helpers:go_default_library",
|
||||
@@ -73,6 +74,7 @@ go_library(
|
||||
"@com_github_ferranbt_fastssz//:go_default_library",
|
||||
"@com_github_ipfs_go_ipfs_addr//:go_default_library",
|
||||
"@com_github_kevinms_leakybucket_go//:go_default_library",
|
||||
"@com_github_kr_pretty//:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p//:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p//config:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p//p2p/protocol/identify:go_default_library",
|
||||
|
||||
@@ -9,6 +9,8 @@ import (
|
||||
|
||||
ssz "github.com/ferranbt/fastssz"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/altair"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
|
||||
eth "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
|
||||
ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/shared/hashutil"
|
||||
@@ -126,6 +128,13 @@ func (s *Service) broadcastAttestation(ctx context.Context, subnet uint64, att *
|
||||
traceutil.AnnotateError(span, err)
|
||||
}
|
||||
}
|
||||
// In the event our attestation is outdated and beyond the
|
||||
// acceptable threshold, we exit early and do not broadcast it.
|
||||
currSlot := helpers.CurrentSlot(uint64(s.genesisTime.Unix()))
|
||||
if att.Data.Slot+params.BeaconConfig().SlotsPerEpoch < currSlot {
|
||||
log.Warnf("Attestation is too old to broadcast, discarding it. Current Slot: %d , Attestation Slot: %d", currSlot, att.Data.Slot)
|
||||
return
|
||||
}
|
||||
|
||||
if err := s.broadcastObject(ctx, att, attestationToTopic(subnet, forkDigest)); err != nil {
|
||||
log.WithError(err).Error("Failed to broadcast attestation")
|
||||
@@ -138,15 +147,17 @@ func (s *Service) broadcastSyncCommittee(ctx context.Context, subnet uint64, sMs
|
||||
defer span.End()
|
||||
ctx = trace.NewContext(context.Background(), span) // clear parent context / deadline.
|
||||
|
||||
oneEpoch := time.Duration(1*params.BeaconConfig().SlotsPerEpoch.Mul(params.BeaconConfig().SecondsPerSlot)) * time.Second
|
||||
ctx, cancel := context.WithTimeout(ctx, oneEpoch)
|
||||
oneSlot := time.Duration(1*params.BeaconConfig().SecondsPerSlot) * time.Second
|
||||
ctx, cancel := context.WithTimeout(ctx, oneSlot)
|
||||
defer cancel()
|
||||
|
||||
// TODO: Change to sync committee subnet locker rather than sharing the same one with attestations.
|
||||
// Ensure we have peers with this subnet.
|
||||
s.subnetLocker(subnet).RLock()
|
||||
// This adds in a special value to the subnet
|
||||
// to ensure that we can re-use the same subnet locker.
|
||||
wrappedSubIdx := subnet + syncLockerVal
|
||||
s.subnetLocker(wrappedSubIdx).RLock()
|
||||
hasPeer := s.hasPeerWithSubnet(syncCommitteeToTopic(subnet, forkDigest))
|
||||
s.subnetLocker(subnet).RUnlock()
|
||||
s.subnetLocker(wrappedSubIdx).RUnlock()
|
||||
|
||||
span.AddAttributes(
|
||||
trace.BoolAttribute("hasPeer", hasPeer),
|
||||
@@ -157,8 +168,8 @@ func (s *Service) broadcastSyncCommittee(ctx context.Context, subnet uint64, sMs
|
||||
if !hasPeer {
|
||||
syncCommitteeBroadcastAttempts.Inc()
|
||||
if err := func() error {
|
||||
s.subnetLocker(subnet).Lock()
|
||||
defer s.subnetLocker(subnet).Unlock()
|
||||
s.subnetLocker(wrappedSubIdx).Lock()
|
||||
defer s.subnetLocker(wrappedSubIdx).Unlock()
|
||||
ok, err := s.FindPeersWithSubnet(ctx, syncCommitteeToTopic(subnet, forkDigest), subnet, 1)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -173,6 +184,12 @@ func (s *Service) broadcastSyncCommittee(ctx context.Context, subnet uint64, sMs
|
||||
traceutil.AnnotateError(span, err)
|
||||
}
|
||||
}
|
||||
// In the event our sync message is outdated and beyond the
|
||||
// acceptable threshold, we exit early and do not broadcast it.
|
||||
if err := altair.ValidateSyncMessageTime(sMsg.Slot, s.genesisTime, params.BeaconNetworkConfig().MaximumGossipClockDisparity); err != nil {
|
||||
log.Warnf("Sync Committee Message is too old to broadcast, discarding it. %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if err := s.broadcastObject(ctx, sMsg, syncCommitteeToTopic(subnet, forkDigest)); err != nil {
|
||||
log.WithError(err).Error("Failed to broadcast sync committee message")
|
||||
|
||||
@@ -7,14 +7,19 @@ import (
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||
pubsubpb "github.com/libp2p/go-libp2p-pubsub/pb"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/encoder"
|
||||
"github.com/prysmaticlabs/prysm/shared/p2putils"
|
||||
"github.com/prysmaticlabs/prysm/shared/params"
|
||||
)
|
||||
|
||||
var _ pubsub.SubscriptionFilter = (*Service)(nil)
|
||||
|
||||
const pubsubSubscriptionRequestLimit = 100
|
||||
// It is set at this limit to handle the possibility
|
||||
// of double topic subscriptions at fork boundaries.
|
||||
// -> 64 Attestation Subnets * 2.
|
||||
// -> 4 Sync Committee Subnets * 2.
|
||||
// -> Block,Aggregate,ProposerSlashing,AttesterSlashing,Exits,SyncContribution * 2.
|
||||
const pubsubSubscriptionRequestLimit = 200
|
||||
|
||||
// CanSubscribe returns true if the topic is of interest and we could subscribe to it.
|
||||
func (s *Service) CanSubscribe(topic string) bool {
|
||||
@@ -37,22 +42,12 @@ func (s *Service) CanSubscribe(topic string) bool {
|
||||
log.WithError(err).Error("Could not determine fork digest")
|
||||
return false
|
||||
}
|
||||
isForkNextEpoch, err := p2putils.IsForkNextEpoch(s.genesisTime, s.genesisValidatorsRoot)
|
||||
digest, err := p2putils.ForkDigestFromEpoch(params.BeaconConfig().AltairForkEpoch, s.genesisValidatorsRoot)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not determine next fork epoch")
|
||||
log.WithError(err).Error("Could not determine next fork digest")
|
||||
return false
|
||||
}
|
||||
if isForkNextEpoch {
|
||||
currEpoch := helpers.SlotToEpoch(helpers.CurrentSlot(uint64(s.genesisTime.Unix())))
|
||||
digest, err := p2putils.ForkDigestFromEpoch(currEpoch+1, s.genesisValidatorsRoot)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not determine next fork digest")
|
||||
return false
|
||||
}
|
||||
if parts[2] != fmt.Sprintf("%x", fd) && parts[2] != fmt.Sprintf("%x", digest) {
|
||||
return false
|
||||
}
|
||||
} else if parts[2] != fmt.Sprintf("%x", fd) {
|
||||
if parts[2] != fmt.Sprintf("%x", fd) && parts[2] != fmt.Sprintf("%x", digest) {
|
||||
return false
|
||||
}
|
||||
if parts[4] != encoder.ProtocolSuffixSSZSnappy {
|
||||
|
||||
@@ -4,11 +4,13 @@ import (
|
||||
"context"
|
||||
|
||||
ssz "github.com/ferranbt/fastssz"
|
||||
"github.com/kr/pretty"
|
||||
"github.com/libp2p/go-libp2p-core/network"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"github.com/libp2p/go-libp2p-core/protocol"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/prysm/shared/traceutil"
|
||||
"github.com/sirupsen/logrus"
|
||||
"go.opencensus.io/trace"
|
||||
)
|
||||
|
||||
@@ -25,6 +27,11 @@ func (s *Service) Send(ctx context.Context, message interface{}, baseTopic strin
|
||||
topic := baseTopic + s.Encoding().ProtocolSuffix()
|
||||
span.AddAttributes(trace.StringAttribute("topic", topic))
|
||||
|
||||
log.WithFields(logrus.Fields{
|
||||
"topic": topic,
|
||||
"request": pretty.Sprint(message),
|
||||
}).Tracef("Sending RPC request to peer %s", pid.String())
|
||||
|
||||
// Apply max dial timeout when opening a new stream.
|
||||
ctx, cancel := context.WithTimeout(ctx, maxDialTimeout)
|
||||
defer cancel()
|
||||
|
||||
@@ -129,7 +129,6 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) {
|
||||
|
||||
s.host = h
|
||||
s.host.RemoveStreamHandler(identify.IDDelta)
|
||||
|
||||
// Gossipsub registration is done before we add in any new peers
|
||||
// due to libp2p's gossipsub implementation not taking into
|
||||
// account previously added peers when creating the gossipsub
|
||||
|
||||
@@ -22,6 +22,11 @@ var syncCommsSubnetCount = params.BeaconConfig().SyncCommitteeSubnetCount
|
||||
var attSubnetEnrKey = params.BeaconNetworkConfig().AttSubnetKey
|
||||
var syncCommsSubnetEnrKey = params.BeaconNetworkConfig().SyncCommsSubnetKey
|
||||
|
||||
// The value used with the subnet, inorder
|
||||
// to create an appropriate key tor retrieve
|
||||
// the relevant lock.
|
||||
const syncLockerVal = 100
|
||||
|
||||
// FindPeersWithSubnet performs a network search for peers
|
||||
// subscribed to a particular subnet. Then we try to connect
|
||||
// with those peers. This method will block until the required amount of
|
||||
@@ -237,6 +242,13 @@ func syncBitvector(record *enr.Record) (bitfield.Bitvector4, error) {
|
||||
return bitV, nil
|
||||
}
|
||||
|
||||
// The subnet locker is a map which keeps track of all
|
||||
// mutexes stored per subnet. This locker is re-used
|
||||
// between both the attestation and sync subnets. In
|
||||
// order to differentiate between attestation and sync
|
||||
// subnets. Sync subnets are stored by (subnet+syncLockerVal). This
|
||||
// is to prevent conflicts while allowing both subnets
|
||||
// to use a single locker.
|
||||
func (s *Service) subnetLocker(i uint64) *sync.RWMutex {
|
||||
s.subnetsLockLock.Lock()
|
||||
defer s.subnetsLockLock.Unlock()
|
||||
|
||||
@@ -107,6 +107,7 @@ go_library(
|
||||
"@com_github_prometheus_client_golang//prometheus:go_default_library",
|
||||
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
|
||||
"@com_github_prysmaticlabs_eth2_types//:go_default_library",
|
||||
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
|
||||
"@com_github_sirupsen_logrus//:go_default_library",
|
||||
"@com_github_trailofbits_go_mutexasserts//:go_default_library",
|
||||
"@io_opencensus_go//trace:go_default_library",
|
||||
|
||||
@@ -97,10 +97,7 @@ func (s *Service) checkForPreviousEpochFork(currEpoch types.Epoch) error {
|
||||
continue
|
||||
}
|
||||
if retDigest == prevDigest {
|
||||
if err := s.cfg.P2P.PubSub().UnregisterTopicValidator(t); err != nil {
|
||||
log.WithError(err).Error("Could not unregister topic validator")
|
||||
}
|
||||
s.subHandler.removeTopic(t)
|
||||
s.unSubscribeFromTopic(t)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,6 +7,8 @@ import (
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
|
||||
"github.com/prysmaticlabs/prysm/cmd/beacon-chain/flags"
|
||||
pb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
|
||||
@@ -20,6 +22,12 @@ var (
|
||||
Help: "The number of peers subscribed to a given topic.",
|
||||
}, []string{"topic"},
|
||||
)
|
||||
subscribedTopicPeerCount = promauto.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Name: "p2p_subscribed_topic_peer_count",
|
||||
Help: "The number of peers subscribed to topics that a host node is also subscribed to.",
|
||||
}, []string{"topic"},
|
||||
)
|
||||
messageReceivedCounter = promauto.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Name: "p2p_message_received_total",
|
||||
@@ -34,6 +42,13 @@ var (
|
||||
},
|
||||
[]string{"topic"},
|
||||
)
|
||||
messageIgnoredValidationCounter = promauto.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Name: "p2p_message_ignored_validation_total",
|
||||
Help: "Count of messages that were ignored in validation.",
|
||||
},
|
||||
[]string{"topic"},
|
||||
)
|
||||
messageFailedProcessingCounter = promauto.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Name: "p2p_message_failed_processing_total",
|
||||
@@ -69,24 +84,35 @@ func (s *Service) updateMetrics() {
|
||||
log.WithError(err).Debugf("Could not compute fork digest")
|
||||
}
|
||||
indices := s.aggregatorSubnetIndices(s.cfg.Chain.CurrentSlot())
|
||||
syncIndices := cache.SyncSubnetIDs.GetAllSubnets(helpers.SlotToEpoch(s.cfg.Chain.CurrentSlot()))
|
||||
attTopic := p2p.GossipTypeMapping[reflect.TypeOf(&pb.Attestation{})]
|
||||
syncTopic := p2p.GossipTypeMapping[reflect.TypeOf(&pb.SyncCommitteeMessage{})]
|
||||
attTopic += s.cfg.P2P.Encoding().ProtocolSuffix()
|
||||
syncTopic += s.cfg.P2P.Encoding().ProtocolSuffix()
|
||||
if flags.Get().SubscribeToAllSubnets {
|
||||
for i := uint64(0); i < params.BeaconNetworkConfig().AttestationSubnetCount; i++ {
|
||||
formattedTopic := fmt.Sprintf(attTopic, digest, i)
|
||||
topicPeerCount.WithLabelValues(formattedTopic).Set(float64(len(s.cfg.P2P.PubSub().ListPeers(formattedTopic))))
|
||||
}
|
||||
for i := uint64(0); i < params.BeaconConfig().SyncCommitteeSubnetCount; i++ {
|
||||
formattedTopic := fmt.Sprintf(syncTopic, digest, i)
|
||||
topicPeerCount.WithLabelValues(formattedTopic).Set(float64(len(s.cfg.P2P.PubSub().ListPeers(formattedTopic))))
|
||||
}
|
||||
} else {
|
||||
for _, committeeIdx := range indices {
|
||||
formattedTopic := fmt.Sprintf(attTopic, digest, committeeIdx)
|
||||
topicPeerCount.WithLabelValues(formattedTopic).Set(float64(len(s.cfg.P2P.PubSub().ListPeers(formattedTopic))))
|
||||
}
|
||||
for _, committeeIdx := range syncIndices {
|
||||
formattedTopic := fmt.Sprintf(syncTopic, digest, committeeIdx)
|
||||
topicPeerCount.WithLabelValues(formattedTopic).Set(float64(len(s.cfg.P2P.PubSub().ListPeers(formattedTopic))))
|
||||
}
|
||||
}
|
||||
|
||||
// We update all other gossip topics.
|
||||
for _, topic := range p2p.AllTopics() {
|
||||
// We already updated attestation subnet topics.
|
||||
if strings.Contains(topic, "beacon_attestation") {
|
||||
if strings.Contains(topic, p2p.GossipAttestationMessage) || strings.Contains(topic, p2p.GossipSyncCommitteeMessage) {
|
||||
continue
|
||||
}
|
||||
topic += s.cfg.P2P.Encoding().ProtocolSuffix()
|
||||
@@ -97,4 +123,8 @@ func (s *Service) updateMetrics() {
|
||||
formattedTopic := fmt.Sprintf(topic, digest)
|
||||
topicPeerCount.WithLabelValues(formattedTopic).Set(float64(len(s.cfg.P2P.PubSub().ListPeers(formattedTopic))))
|
||||
}
|
||||
|
||||
for _, topic := range s.cfg.P2P.PubSub().GetTopics() {
|
||||
subscribedTopicPeerCount.WithLabelValues(topic).Set(float64(len(s.cfg.P2P.PubSub().ListPeers(topic))))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,11 +6,14 @@ import (
|
||||
libp2pcore "github.com/libp2p/go-libp2p-core"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/go-bitfield"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/types"
|
||||
pb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/metadata"
|
||||
"github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/wrapper"
|
||||
"github.com/prysmaticlabs/prysm/shared/bytesutil"
|
||||
"github.com/prysmaticlabs/prysm/shared/p2putils"
|
||||
"github.com/prysmaticlabs/prysm/shared/params"
|
||||
@@ -36,14 +39,8 @@ func (s *Service) metaDataHandler(_ context.Context, _ interface{}, stream libp2
|
||||
}
|
||||
return nilErr
|
||||
}
|
||||
topicVersion := ""
|
||||
switch s.cfg.P2P.Metadata().Version() {
|
||||
case version.Phase0:
|
||||
topicVersion = p2p.SchemaVersionV1
|
||||
case version.Altair:
|
||||
topicVersion = p2p.SchemaVersionV2
|
||||
}
|
||||
if err := validateVersion(topicVersion, stream); err != nil {
|
||||
_, _, streamVersion, err := p2p.TopicDeconstructor(string(stream.Protocol()))
|
||||
if err != nil {
|
||||
resp, genErr := s.generateErrorResponse(responseCodeServerError, types.ErrGeneric.Error())
|
||||
if genErr != nil {
|
||||
log.WithError(genErr).Debug("Could not generate a response error")
|
||||
@@ -52,10 +49,34 @@ func (s *Service) metaDataHandler(_ context.Context, _ interface{}, stream libp2
|
||||
}
|
||||
return err
|
||||
}
|
||||
currMd := s.cfg.P2P.Metadata()
|
||||
switch streamVersion {
|
||||
case p2p.SchemaVersionV1:
|
||||
// We have a v1 metadata object saved locally, so we
|
||||
// convert it back to a v0 metadata object.
|
||||
if currMd.Version() != version.Phase0 {
|
||||
currMd = wrapper.WrappedMetadataV0(
|
||||
&pb.MetaDataV0{
|
||||
Attnets: currMd.AttnetsBitfield(),
|
||||
SeqNumber: currMd.SequenceNumber(),
|
||||
})
|
||||
}
|
||||
case p2p.SchemaVersionV2:
|
||||
// We have a v0 metadata object saved locally, so we
|
||||
// convert it to a v1 metadata object.
|
||||
if currMd.Version() != version.Altair {
|
||||
currMd = wrapper.WrappedMetadataV1(
|
||||
&pb.MetaDataV1{
|
||||
Attnets: currMd.AttnetsBitfield(),
|
||||
SeqNumber: currMd.SequenceNumber(),
|
||||
Syncnets: bitfield.Bitvector4{byte(0x00)},
|
||||
})
|
||||
}
|
||||
}
|
||||
if _, err := stream.Write([]byte{responseCodeSuccess}); err != nil {
|
||||
return err
|
||||
}
|
||||
_, err := s.cfg.P2P.Encoding().EncodeWithMaxLength(stream, s.cfg.P2P.Metadata())
|
||||
_, err = s.cfg.P2P.Encoding().EncodeWithMaxLength(stream, currMd)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -2,7 +2,6 @@ package sync
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sync"
|
||||
"testing"
|
||||
@@ -17,7 +16,6 @@ import (
|
||||
db "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
|
||||
p2ptest "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/types"
|
||||
pb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/metadata"
|
||||
"github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/wrapper"
|
||||
@@ -189,11 +187,11 @@ func TestMetadataRPCHandler_SendsMetadataAltair(t *testing.T) {
|
||||
p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) {
|
||||
defer wg.Done()
|
||||
err := r2.metaDataHandler(context.Background(), new(interface{}), stream)
|
||||
assert.ErrorContains(t, fmt.Sprintf("stream version of %s doesn't match provided version %s", p2p.SchemaVersionV2, p2p.SchemaVersionV1), err)
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
|
||||
_, err := r.sendMetaDataRequest(context.Background(), p2.BHost.ID())
|
||||
assert.ErrorContains(t, types.ErrGeneric.Error(), err)
|
||||
assert.NoError(t, err)
|
||||
|
||||
if testutil.WaitTimeout(&wg, 1*time.Second) {
|
||||
t.Fatal("Did not receive stream within 1 sec")
|
||||
|
||||
@@ -178,10 +178,7 @@ func (s *Service) Stop() error {
|
||||
}
|
||||
// Deregister Topic Subscribers.
|
||||
for _, t := range s.cfg.P2P.PubSub().GetTopics() {
|
||||
if err := s.cfg.P2P.PubSub().UnregisterTopicValidator(t); err != nil {
|
||||
log.Errorf("Could not successfully unregister for topic %s: %v", t, err)
|
||||
}
|
||||
s.subHandler.removeTopic(t)
|
||||
s.unSubscribeFromTopic(t)
|
||||
}
|
||||
defer s.cancel()
|
||||
return nil
|
||||
|
||||
@@ -213,6 +213,7 @@ func (s *Service) subscribeWithBase(topic string, validator pubsub.ValidatorEx,
|
||||
}
|
||||
|
||||
go messageLoop()
|
||||
log.WithField("topic", topic).Info("Subscribed to topic")
|
||||
return sub
|
||||
}
|
||||
|
||||
@@ -231,13 +232,30 @@ func (s *Service) wrapAndReportValidation(topic string, v pubsub.ValidatorEx) (s
|
||||
}
|
||||
// Ignore any messages received before chainstart.
|
||||
if s.chainStarted.IsNotSet() {
|
||||
messageFailedValidationCounter.WithLabelValues(topic).Inc()
|
||||
messageIgnoredValidationCounter.WithLabelValues(topic).Inc()
|
||||
return pubsub.ValidationIgnore
|
||||
}
|
||||
retDigest, err := p2p.ExtractGossipDigest(topic)
|
||||
if err != nil {
|
||||
log.WithField("topic", topic).Errorf("Invalid topic format of pubsub topic: %v", err)
|
||||
return pubsub.ValidationIgnore
|
||||
}
|
||||
currDigest, err := s.currentForkDigest()
|
||||
if err != nil {
|
||||
log.WithField("topic", topic).Errorf("Unable to retrieve fork data: %v", err)
|
||||
return pubsub.ValidationIgnore
|
||||
}
|
||||
if currDigest != retDigest {
|
||||
log.WithField("topic", topic).Warnf("Received message from outdated fork digest %#x", retDigest)
|
||||
return pubsub.ValidationIgnore
|
||||
}
|
||||
b := v(ctx, pid, msg)
|
||||
if b == pubsub.ValidationReject {
|
||||
messageFailedValidationCounter.WithLabelValues(topic).Inc()
|
||||
}
|
||||
if b == pubsub.ValidationIgnore {
|
||||
messageIgnoredValidationCounter.WithLabelValues(topic).Inc()
|
||||
}
|
||||
return b
|
||||
}
|
||||
}
|
||||
@@ -280,10 +298,7 @@ func (s *Service) subscribeStaticWithSubnets(topic string, validator pubsub.Vali
|
||||
// Unsubscribes from all our current subnets.
|
||||
for i := uint64(0); i < params.BeaconNetworkConfig().AttestationSubnetCount; i++ {
|
||||
fullTopic := fmt.Sprintf(topic, digest, i) + s.cfg.P2P.Encoding().ProtocolSuffix()
|
||||
if err := s.cfg.P2P.PubSub().UnregisterTopicValidator(fullTopic); err != nil {
|
||||
log.WithError(err).Error("Could not unregister topic validator")
|
||||
}
|
||||
s.subHandler.removeTopic(fullTopic)
|
||||
s.unSubscribeFromTopic(fullTopic)
|
||||
}
|
||||
ticker.Done()
|
||||
return
|
||||
@@ -386,10 +401,7 @@ func (s *Service) reValidateSubscriptions(subscriptions map[uint64]*pubsub.Subsc
|
||||
if !wanted && v != nil {
|
||||
v.Cancel()
|
||||
fullTopic := fmt.Sprintf(topicFormat, digest, k) + s.cfg.P2P.Encoding().ProtocolSuffix()
|
||||
if err := s.cfg.P2P.PubSub().UnregisterTopicValidator(fullTopic); err != nil {
|
||||
log.WithError(err).Error("Could not unregister topic validator")
|
||||
}
|
||||
s.subHandler.removeTopic(fullTopic)
|
||||
s.unSubscribeFromTopic(fullTopic)
|
||||
delete(subscriptions, k)
|
||||
}
|
||||
}
|
||||
@@ -485,10 +497,7 @@ func (s *Service) subscribeStaticWithSyncSubnets(topic string, validator pubsub.
|
||||
// Unsubscribes from all our current subnets.
|
||||
for i := uint64(0); i < params.BeaconConfig().SyncCommitteeSubnetCount; i++ {
|
||||
fullTopic := fmt.Sprintf(topic, digest, i) + s.cfg.P2P.Encoding().ProtocolSuffix()
|
||||
if err := s.cfg.P2P.PubSub().UnregisterTopicValidator(fullTopic); err != nil {
|
||||
log.WithError(err).Error("Could not unregister topic validator")
|
||||
}
|
||||
s.subHandler.removeTopic(fullTopic)
|
||||
s.unSubscribeFromTopic(fullTopic)
|
||||
}
|
||||
ticker.Done()
|
||||
return
|
||||
@@ -588,6 +597,21 @@ func (s *Service) lookupAttesterSubnets(digest [4]byte, idx uint64) {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) unSubscribeFromTopic(topic string) {
|
||||
log.WithField("topic", topic).Debug("Unsubscribing from topic")
|
||||
if err := s.cfg.P2P.PubSub().UnregisterTopicValidator(topic); err != nil {
|
||||
log.WithError(err).Error("Could not unregister topic validator")
|
||||
}
|
||||
sub := s.subHandler.subForTopic(topic)
|
||||
if sub != nil {
|
||||
sub.Cancel()
|
||||
}
|
||||
s.subHandler.removeTopic(topic)
|
||||
if err := s.cfg.P2P.LeaveTopic(topic); err != nil {
|
||||
log.WithError(err).Error("Unable to leave topic")
|
||||
}
|
||||
}
|
||||
|
||||
// find if we have peers who are subscribed to the same subnet
|
||||
func (s *Service) validPeersExist(subnetTopic string) bool {
|
||||
numOfPeers := s.cfg.P2P.PubSub().ListPeers(subnetTopic + s.cfg.P2P.Encoding().ProtocolSuffix())
|
||||
|
||||
@@ -18,11 +18,13 @@ import (
|
||||
db "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/operations/slashings"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/encoder"
|
||||
p2ptest "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
|
||||
mockSync "github.com/prysmaticlabs/prysm/beacon-chain/sync/initial-sync/testing"
|
||||
pb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/shared/abool"
|
||||
"github.com/prysmaticlabs/prysm/shared/bytesutil"
|
||||
"github.com/prysmaticlabs/prysm/shared/p2putils"
|
||||
"github.com/prysmaticlabs/prysm/shared/params"
|
||||
"github.com/prysmaticlabs/prysm/shared/testutil"
|
||||
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
|
||||
@@ -280,6 +282,13 @@ func TestStaticSubnets(t *testing.T) {
|
||||
}
|
||||
|
||||
func Test_wrapAndReportValidation(t *testing.T) {
|
||||
mChain := &mockChain.ChainService{
|
||||
Genesis: time.Now(),
|
||||
ValidatorsRoot: [32]byte{0x01},
|
||||
}
|
||||
fd, err := p2putils.CreateForkDigest(mChain.GenesisTime(), mChain.ValidatorsRoot[:])
|
||||
assert.NoError(t, err)
|
||||
mockTopic := fmt.Sprintf(p2p.BlockSubnetTopicFormat, fd) + encoder.SszNetworkEncoder{}.ProtocolSuffix()
|
||||
type args struct {
|
||||
topic string
|
||||
v pubsub.ValidatorEx
|
||||
@@ -333,7 +342,7 @@ func Test_wrapAndReportValidation(t *testing.T) {
|
||||
{
|
||||
name: "validator OK",
|
||||
args: args{
|
||||
topic: "foo",
|
||||
topic: mockTopic,
|
||||
v: func(ctx context.Context, id peer.ID, message *pubsub.Message) pubsub.ValidationResult {
|
||||
return pubsub.ValidationAccept
|
||||
},
|
||||
@@ -341,7 +350,7 @@ func Test_wrapAndReportValidation(t *testing.T) {
|
||||
msg: &pubsub.Message{
|
||||
Message: &pubsubpb.Message{
|
||||
Topic: func() *string {
|
||||
s := "foo"
|
||||
s := mockTopic
|
||||
return &s
|
||||
}(),
|
||||
},
|
||||
@@ -371,6 +380,9 @@ func Test_wrapAndReportValidation(t *testing.T) {
|
||||
chainStarted.SetTo(tt.args.chainstarted)
|
||||
s := &Service{
|
||||
chainStarted: chainStarted,
|
||||
cfg: &Config{
|
||||
Chain: mChain,
|
||||
},
|
||||
}
|
||||
_, v := s.wrapAndReportValidation(tt.args.topic, tt.args.v)
|
||||
got := v(context.Background(), tt.args.pid, tt.args.msg)
|
||||
|
||||
@@ -82,3 +82,9 @@ func (s *subTopicHandler) allTopics() []string {
|
||||
}
|
||||
return topics
|
||||
}
|
||||
|
||||
func (s *subTopicHandler) subForTopic(topic string) *pubsub.Subscription {
|
||||
s.RLock()
|
||||
defer s.RUnlock()
|
||||
return s.subTopics[topic]
|
||||
}
|
||||
|
||||
@@ -141,7 +141,7 @@ func (s *Service) validateBeaconBlockPubSub(ctx context.Context, pid peer.ID, ms
|
||||
}
|
||||
|
||||
// Handle block when the parent is unknown.
|
||||
if !s.cfg.DB.HasBlock(ctx, bytesutil.ToBytes32(blk.Block().ParentRoot())) {
|
||||
if !s.cfg.DB.HasBlock(ctx, bytesutil.ToBytes32(blk.Block().ParentRoot())) && !s.cfg.Chain.HasInitSyncBlock(bytesutil.ToBytes32(blk.Block().ParentRoot())) {
|
||||
s.pendingQueueLock.Lock()
|
||||
if err := s.insertBlockToPendingQueue(blk.Block().Slot(), blk, blockRoot); err != nil {
|
||||
s.pendingQueueLock.Unlock()
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||
types "github.com/prysmaticlabs/eth2-types"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/altair"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
|
||||
p2ptypes "github.com/prysmaticlabs/prysm/beacon-chain/p2p/types"
|
||||
@@ -54,7 +55,7 @@ func (s *Service) validateSyncCommitteeMessage(ctx context.Context, pid peer.ID,
|
||||
}
|
||||
|
||||
// The message's `slot` is for the current slot (with a MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance)
|
||||
if err := helpers.VerifySlotTime(uint64(s.cfg.Chain.GenesisTime().Unix()), m.Slot, params.BeaconNetworkConfig().MaximumGossipClockDisparity); err != nil {
|
||||
if err := altair.ValidateSyncMessageTime(m.Slot, s.cfg.Chain.GenesisTime(), params.BeaconNetworkConfig().MaximumGossipClockDisparity); err != nil {
|
||||
traceutil.AnnotateError(span, err)
|
||||
return pubsub.ValidationIgnore
|
||||
}
|
||||
|
||||
@@ -533,7 +533,7 @@ func TestService_ValidateSyncContributionAndProof(t *testing.T) {
|
||||
want: pubsub.ValidationReject,
|
||||
},
|
||||
{
|
||||
name: "Valid Signed Sync Contribution And Proof",
|
||||
name: "Invalid Signed Sync Contribution And Proof - Zero Bits Set",
|
||||
svc: NewService(context.Background(), &Config{
|
||||
P2P: mockp2p.NewTestP2P(t),
|
||||
InitialSync: &mockSync.Sync{IsSyncing: false},
|
||||
@@ -596,6 +596,102 @@ func TestService_ValidateSyncContributionAndProof(t *testing.T) {
|
||||
assert.NoError(t, s.initCaches())
|
||||
return s
|
||||
},
|
||||
args: args{
|
||||
ctx: context.Background(),
|
||||
pid: "random",
|
||||
topic: defaultTopic,
|
||||
msg: ðpb.SignedContributionAndProof{
|
||||
Message: ðpb.ContributionAndProof{
|
||||
AggregatorIndex: 1,
|
||||
Contribution: ðpb.SyncCommitteeContribution{
|
||||
Slot: 1,
|
||||
SubcommitteeIndex: 1,
|
||||
BlockRoot: params.BeaconConfig().ZeroHash[:],
|
||||
AggregationBits: bitfield.NewBitvector128(),
|
||||
Signature: emptySig[:],
|
||||
},
|
||||
SelectionProof: emptySig[:],
|
||||
},
|
||||
Signature: emptySig[:],
|
||||
}},
|
||||
want: pubsub.ValidationReject,
|
||||
},
|
||||
{
|
||||
name: "Valid Signed Sync Contribution And Proof - Single Bit Set",
|
||||
svc: NewService(context.Background(), &Config{
|
||||
P2P: mockp2p.NewTestP2P(t),
|
||||
InitialSync: &mockSync.Sync{IsSyncing: false},
|
||||
Chain: chainService,
|
||||
StateNotifier: chainService.StateNotifier(),
|
||||
OperationNotifier: chainService.OperationNotifier(),
|
||||
}),
|
||||
setupSvc: func(s *Service, msg *ethpb.SignedContributionAndProof) *Service {
|
||||
s.cfg.StateGen = stategen.New(db)
|
||||
msg.Message.Contribution.BlockRoot = headRoot[:]
|
||||
s.cfg.DB = db
|
||||
hState, err := db.State(context.Background(), headRoot)
|
||||
assert.NoError(t, err)
|
||||
sc, err := hState.CurrentSyncCommittee()
|
||||
assert.NoError(t, err)
|
||||
cd, err := helpers.Domain(hState.Fork(), helpers.SlotToEpoch(helpers.PrevSlot(hState.Slot())), params.BeaconConfig().DomainContributionAndProof, hState.GenesisValidatorRoot())
|
||||
assert.NoError(t, err)
|
||||
d, err := helpers.Domain(hState.Fork(), helpers.SlotToEpoch(hState.Slot()), params.BeaconConfig().DomainSyncCommittee, hState.GenesisValidatorRoot())
|
||||
assert.NoError(t, err)
|
||||
var pubkeys [][]byte
|
||||
for i := uint64(0); i < params.BeaconConfig().SyncCommitteeSubnetCount; i++ {
|
||||
coms, err := altair.SyncSubCommitteePubkeys(sc, types.CommitteeIndex(i))
|
||||
pubkeys = coms
|
||||
assert.NoError(t, err)
|
||||
for _, p := range coms {
|
||||
idx, ok := hState.ValidatorIndexByPubkey(bytesutil.ToBytes48(p))
|
||||
assert.Equal(t, true, ok)
|
||||
rt, err := syncSelectionProofSigningRoot(hState, helpers.PrevSlot(hState.Slot()), types.CommitteeIndex(i))
|
||||
assert.NoError(t, err)
|
||||
sig := keys[idx].Sign(rt[:])
|
||||
isAggregator, err := altair.IsSyncCommitteeAggregator(sig.Marshal())
|
||||
require.NoError(t, err)
|
||||
if isAggregator {
|
||||
msg.Message.AggregatorIndex = idx
|
||||
msg.Message.SelectionProof = sig.Marshal()
|
||||
msg.Message.Contribution.Slot = helpers.PrevSlot(hState.Slot())
|
||||
msg.Message.Contribution.SubcommitteeIndex = i
|
||||
msg.Message.Contribution.BlockRoot = headRoot[:]
|
||||
msg.Message.Contribution.AggregationBits = bitfield.NewBitvector128()
|
||||
// Only Sign for 1 validator.
|
||||
rawBytes := p2ptypes.SSZBytes(headRoot[:])
|
||||
sigRoot, err := helpers.ComputeSigningRoot(&rawBytes, d)
|
||||
assert.NoError(t, err)
|
||||
valIdx, ok := hState.ValidatorIndexByPubkey(bytesutil.ToBytes48(coms[0]))
|
||||
assert.Equal(t, true, ok)
|
||||
sig = keys[valIdx].Sign(sigRoot[:])
|
||||
msg.Message.Contribution.AggregationBits.SetBitAt(uint64(0), true)
|
||||
msg.Message.Contribution.Signature = sig.Marshal()
|
||||
|
||||
sigRoot, err = helpers.ComputeSigningRoot(msg.Message, cd)
|
||||
assert.NoError(t, err)
|
||||
contrSig := keys[idx].Sign(sigRoot[:])
|
||||
msg.Signature = contrSig.Marshal()
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pd, err := helpers.Domain(hState.Fork(), helpers.SlotToEpoch(helpers.PrevSlot(hState.Slot())), params.BeaconConfig().DomainSyncCommitteeSelectionProof, hState.GenesisValidatorRoot())
|
||||
require.NoError(t, err)
|
||||
subCommitteeSize := params.BeaconConfig().SyncCommitteeSize / params.BeaconConfig().SyncCommitteeSubnetCount
|
||||
s.cfg.Chain = &mockChain.ChainService{
|
||||
ValidatorsRoot: [32]byte{'A'},
|
||||
Genesis: time.Now().Add(-time.Second * time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Duration(hState.Slot()-1)),
|
||||
CurrentSyncCommitteeIndices: []types.CommitteeIndex{types.CommitteeIndex(msg.Message.Contribution.SubcommitteeIndex * subCommitteeSize)},
|
||||
PublicKey: bytesutil.ToBytes48(keys[msg.Message.AggregatorIndex].PublicKey().Marshal()),
|
||||
SyncSelectionProofDomain: pd,
|
||||
SyncContributionProofDomain: cd,
|
||||
SyncCommitteeDomain: d,
|
||||
SyncCommitteePubkeys: pubkeys,
|
||||
}
|
||||
assert.NoError(t, s.initCaches())
|
||||
return s
|
||||
},
|
||||
args: args{
|
||||
ctx: context.Background(),
|
||||
pid: "random",
|
||||
|
||||
@@ -349,6 +349,8 @@ func TestSubmitSignedContributionAndProof_CouldNotSubmitContribution(t *testing.
|
||||
SignatureDomain: make([]byte, 32),
|
||||
}, nil)
|
||||
|
||||
aggBits := bitfield.NewBitvector128()
|
||||
aggBits.SetBitAt(0, true)
|
||||
m.validatorClient.EXPECT().GetSyncCommitteeContribution(
|
||||
gomock.Any(), // ctx
|
||||
ðpb.SyncCommitteeContributionRequest{
|
||||
@@ -359,7 +361,7 @@ func TestSubmitSignedContributionAndProof_CouldNotSubmitContribution(t *testing.
|
||||
).Return(ðpb.SyncCommitteeContribution{
|
||||
BlockRoot: make([]byte, 32),
|
||||
Signature: make([]byte, 96),
|
||||
AggregationBits: bitfield.NewBitvector128(),
|
||||
AggregationBits: aggBits,
|
||||
}, nil)
|
||||
|
||||
m.validatorClient.EXPECT().
|
||||
@@ -425,6 +427,8 @@ func TestSubmitSignedContributionAndProof_Ok(t *testing.T) {
|
||||
SignatureDomain: make([]byte, 32),
|
||||
}, nil)
|
||||
|
||||
aggBits := bitfield.NewBitvector128()
|
||||
aggBits.SetBitAt(0, true)
|
||||
m.validatorClient.EXPECT().GetSyncCommitteeContribution(
|
||||
gomock.Any(), // ctx
|
||||
ðpb.SyncCommitteeContributionRequest{
|
||||
@@ -435,7 +439,7 @@ func TestSubmitSignedContributionAndProof_Ok(t *testing.T) {
|
||||
).Return(ðpb.SyncCommitteeContribution{
|
||||
BlockRoot: make([]byte, 32),
|
||||
Signature: make([]byte, 96),
|
||||
AggregationBits: bitfield.NewBitvector128(),
|
||||
AggregationBits: aggBits,
|
||||
}, nil)
|
||||
|
||||
m.validatorClient.EXPECT().
|
||||
|
||||
Reference in New Issue
Block a user