Clean up sync committee p2p pipelines (#9153)

This commit is contained in:
terence tsao
2021-07-07 07:35:01 -07:00
committed by GitHub
parent 47367d98b4
commit 2bb3da1ba3
11 changed files with 21 additions and 48 deletions

View File

@@ -291,7 +291,7 @@ func IsSyncCommitteeAggregator(sig []byte) bool {
return bytesutil.FromBytes8(hashedSig[:8])%modulo == 0
}
// SyncCommitteeSigningRoot returns the signing root from the relevant provided data.
// SyncSelectionProofSigningRoot returns the signing root from the relevant provided data.
//
// def get_sync_committee_selection_proof(state: BeaconState,
// slot: Slot,
@@ -304,7 +304,7 @@ func IsSyncCommitteeAggregator(sig []byte) bool {
// )
// signing_root = compute_signing_root(signing_data, domain)
// return bls.Sign(privkey, signing_root)
func SyncCommitteeSigningRoot(st iface.BeaconState, slot types.Slot, comIdx types.CommitteeIndex) ([32]byte, error) {
func SyncSelectionProofSigningRoot(st iface.BeaconState, slot types.Slot, comIdx types.CommitteeIndex) ([32]byte, error) {
dom, err := helpers.Domain(st.Fork(), helpers.SlotToEpoch(slot), params.BeaconConfig().DomainSyncCommitteeSelectionProof, st.GenesisValidatorRoot())
if err != nil {
return [32]byte{}, err

View File

@@ -29,7 +29,7 @@ go_library(
"subscriber_beacon_attestation.go",
"subscriber_beacon_blocks.go",
"subscriber_handlers.go",
"subscriber_sync_committee.go",
"subscriber_sync_committee_message.go",
"subscriber_sync_contribution_proof.go",
"utils.go",
"validate_aggregate_proof.go",
@@ -37,7 +37,7 @@ go_library(
"validate_beacon_attestation.go",
"validate_beacon_blocks.go",
"validate_proposer_slashing.go",
"validate_sync_committee.go",
"validate_sync_committee_message.go",
"validate_sync_contrbution_proof.go",
"validate_voluntary_exit.go",
],

View File

@@ -10,7 +10,7 @@ import (
)
// skipcq: SCC-U1000
func (s *Service) syncCommitteeSubscriber(_ context.Context, msg proto.Message) error {
func (s *Service) syncCommitteeMessageSubscriber(_ context.Context, msg proto.Message) error {
m, ok := msg.(*prysmv2.SyncCommitteeMessage)
if !ok {
return fmt.Errorf("message was not type *eth.SyncCommitteeMessage, type=%T", msg)

View File

@@ -98,7 +98,7 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p
return pubsub.ValidationReject
}
// Verify the block being voted and the processed state is in DB and. The block should have passed validation if it's in the DB.
// Verify the block being voted and the processed state is in DB and the block has passed validation if it's in the DB.
blockRoot := bytesutil.ToBytes32(att.Data.BeaconBlockRoot)
if !s.hasBlockAndState(ctx, blockRoot) {
// A node doesn't have the block, it'll request from peer while saving the pending attestation to a queue.

View File

@@ -10,8 +10,6 @@ import (
pubsub "github.com/libp2p/go-libp2p-pubsub"
types "github.com/prysmaticlabs/eth2-types"
"github.com/prysmaticlabs/prysm/beacon-chain/core/altair"
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed/operation"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
p2ptypes "github.com/prysmaticlabs/prysm/beacon-chain/p2p/types"
@@ -24,7 +22,7 @@ import (
"go.opencensus.io/trace"
)
func (s *Service) validateSyncCommittee(ctx context.Context, pid peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
func (s *Service) validateSyncCommitteeMessage(ctx context.Context, pid peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
if pid == s.cfg.P2P.PeerID() {
return pubsub.ValidationAccept
}
@@ -33,7 +31,7 @@ func (s *Service) validateSyncCommittee(ctx context.Context, pid peer.ID, msg *p
if s.cfg.InitialSync.Syncing() {
return pubsub.ValidationIgnore
}
ctx, span := trace.StartSpan(ctx, "sync.validateSyncCommittee")
ctx, span := trace.StartSpan(ctx, "sync.validateSyncCommitteeMessage")
defer span.End()
if msg.Topic == nil {
@@ -62,15 +60,6 @@ func (s *Service) validateSyncCommittee(ctx context.Context, pid peer.ID, msg *p
return pubsub.ValidationReject
}
// Broadcast the sync committee on a feed to notify other services in the beacon node
// of a received sync committee.
s.cfg.OperationNotifier.OperationFeed().Send(&feed.Event{
Type: operation.SyncCommMessageReceived,
Data: &operation.SyncCommReceivedData{
Message: comMsg,
},
})
if err := helpers.VerifySlotTime(uint64(s.cfg.Chain.GenesisTime().Unix()), comMsg.Slot, params.BeaconNetworkConfig().MaximumGossipClockDisparity); err != nil {
traceutil.AnnotateError(span, err)
return pubsub.ValidationIgnore
@@ -80,7 +69,7 @@ func (s *Service) validateSyncCommittee(ctx context.Context, pid peer.ID, msg *p
return pubsub.ValidationIgnore
}
// Verify the block being voted and the processed state is in DB and. The block should have passed validation if it's in the DB.
// Verify the block being voted and the processed state is in DB and the block has passed validation if it's in the DB.
blockRoot := bytesutil.ToBytes32(comMsg.BlockRoot)
if !s.hasBlockAndState(ctx, blockRoot) {
return pubsub.ValidationIgnore
@@ -99,7 +88,7 @@ func (s *Service) validateSyncCommittee(ctx context.Context, pid peer.ID, msg *p
return pubsub.ValidationReject
}
// Check for validity of validator index.
_, err = bState.ValidatorAtIndexReadOnly(comMsg.ValidatorIndex)
val, err := bState.ValidatorAtIndexReadOnly(comMsg.ValidatorIndex)
if err != nil {
traceutil.AnnotateError(span, err)
return pubsub.ValidationReject
@@ -127,11 +116,6 @@ func (s *Service) validateSyncCommittee(ctx context.Context, pid peer.ID, msg *p
return pubsub.ValidationReject
}
val, err := bState.ValidatorAtIndexReadOnly(comMsg.ValidatorIndex)
if err != nil {
traceutil.AnnotateError(span, err)
return pubsub.ValidationIgnore
}
d, err := helpers.Domain(bState.Fork(), helpers.SlotToEpoch(bState.Slot()), params.BeaconConfig().DomainSyncCommittee, bState.GenesisValidatorRoot())
if err != nil {
traceutil.AnnotateError(span, err)

View File

@@ -378,7 +378,7 @@ func TestService_ValidateSyncCommittee(t *testing.T) {
ReceivedFrom: "",
ValidatorData: nil,
}
if got := tt.svc.validateSyncCommittee(tt.args.ctx, tt.args.pid, msg); got != tt.want {
if got := tt.svc.validateSyncCommitteeMessage(tt.args.ctx, tt.args.pid, msg); got != tt.want {
t.Errorf("validateSyncContributionAndProof() = %v, want %v", got, tt.want)
}
})

View File

@@ -8,8 +8,6 @@ import (
pubsub "github.com/libp2p/go-libp2p-pubsub"
types "github.com/prysmaticlabs/eth2-types"
"github.com/prysmaticlabs/prysm/beacon-chain/core/altair"
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed/operation"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
p2ptypes "github.com/prysmaticlabs/prysm/beacon-chain/p2p/types"
iface "github.com/prysmaticlabs/prysm/beacon-chain/state/interface"
@@ -55,15 +53,6 @@ func (s *Service) validateSyncContributionAndProof(ctx context.Context, pid peer
return pubsub.ValidationReject
}
// Broadcast the aggregated attestation on a feed to notify other services in the beacon node
// of a received aggregated attestation.
s.cfg.OperationNotifier.OperationFeed().Send(&feed.Event{
Type: operation.SyncContributionReceived,
Data: &operation.SyncContributionReceivedData{
Contribution: m.Message,
},
})
if err := helpers.VerifySlotTime(uint64(s.cfg.Chain.GenesisTime().Unix()), m.Message.Contribution.Slot, params.BeaconNetworkConfig().MaximumGossipClockDisparity); err != nil {
traceutil.AnnotateError(span, err)
return pubsub.ValidationIgnore
@@ -168,7 +157,7 @@ func (s *Service) validateSyncContributionAndProof(ctx context.Context, pid peer
return pubsub.ValidationAccept
}
// Returns true if the node has received sync contribution for the aggregator with index,slot and subcommittee index.
// Returns true if the node has received sync contribution for the aggregator with index, slot and subcommittee index.
func (s *Service) hasSeenSyncContributionIndexSlot(slot types.Slot, aggregatorIndex types.ValidatorIndex, subComIdx types.CommitteeIndex) bool {
s.seenSyncContributionLock.RLock()
defer s.seenSyncContributionLock.RUnlock()

View File

@@ -237,7 +237,7 @@ func TestService_ValidateSyncContributionAndProof(t *testing.T) {
for _, p := range coms {
idx, ok := hState.ValidatorIndexByPubkey(bytesutil.ToBytes48(p))
assert.Equal(t, true, ok)
rt, err := altair.SyncCommitteeSigningRoot(hState, helpers.PrevSlot(hState.Slot()), types.CommitteeIndex(i))
rt, err := altair.SyncSelectionProofSigningRoot(hState, helpers.PrevSlot(hState.Slot()), types.CommitteeIndex(i))
assert.NoError(t, err)
sig := keys[idx].Sign(rt[:])
if !altair.IsSyncCommitteeAggregator(sig.Marshal()) {
@@ -289,7 +289,7 @@ func TestService_ValidateSyncContributionAndProof(t *testing.T) {
for _, p := range coms {
idx, ok := hState.ValidatorIndexByPubkey(bytesutil.ToBytes48(p))
assert.Equal(t, true, ok)
rt, err := altair.SyncCommitteeSigningRoot(hState, helpers.PrevSlot(hState.Slot()), types.CommitteeIndex(i))
rt, err := altair.SyncSelectionProofSigningRoot(hState, helpers.PrevSlot(hState.Slot()), types.CommitteeIndex(i))
assert.NoError(t, err)
sig := keys[idx].Sign(rt[:])
if altair.IsSyncCommitteeAggregator(sig.Marshal()) {
@@ -347,7 +347,7 @@ func TestService_ValidateSyncContributionAndProof(t *testing.T) {
for _, p := range coms {
idx, ok := hState.ValidatorIndexByPubkey(bytesutil.ToBytes48(p))
assert.Equal(t, true, ok)
rt, err := altair.SyncCommitteeSigningRoot(hState, helpers.PrevSlot(hState.Slot()), types.CommitteeIndex(i))
rt, err := altair.SyncSelectionProofSigningRoot(hState, helpers.PrevSlot(hState.Slot()), types.CommitteeIndex(i))
assert.NoError(t, err)
sig := keys[idx].Sign(rt[:])
if altair.IsSyncCommitteeAggregator(sig.Marshal()) {
@@ -413,7 +413,7 @@ func TestService_ValidateSyncContributionAndProof(t *testing.T) {
for _, p := range coms {
idx, ok := hState.ValidatorIndexByPubkey(bytesutil.ToBytes48(p))
assert.Equal(t, true, ok)
rt, err := altair.SyncCommitteeSigningRoot(hState, helpers.PrevSlot(hState.Slot()), types.CommitteeIndex(i))
rt, err := altair.SyncSelectionProofSigningRoot(hState, helpers.PrevSlot(hState.Slot()), types.CommitteeIndex(i))
assert.NoError(t, err)
sig := keys[idx].Sign(rt[:])
if altair.IsSyncCommitteeAggregator(sig.Marshal()) {
@@ -489,7 +489,7 @@ func TestService_ValidateSyncContributionAndProof(t *testing.T) {
for _, p := range coms {
idx, ok := hState.ValidatorIndexByPubkey(bytesutil.ToBytes48(p))
assert.Equal(t, true, ok)
rt, err := altair.SyncCommitteeSigningRoot(hState, helpers.PrevSlot(hState.Slot()), types.CommitteeIndex(i))
rt, err := altair.SyncSelectionProofSigningRoot(hState, helpers.PrevSlot(hState.Slot()), types.CommitteeIndex(i))
assert.NoError(t, err)
sig := keys[idx].Sign(rt[:])
if altair.IsSyncCommitteeAggregator(sig.Marshal()) {
@@ -561,7 +561,7 @@ func TestService_ValidateSyncContributionAndProof(t *testing.T) {
for _, p := range coms {
idx, ok := hState.ValidatorIndexByPubkey(bytesutil.ToBytes48(p))
assert.Equal(t, true, ok)
rt, err := altair.SyncCommitteeSigningRoot(hState, helpers.PrevSlot(hState.Slot()), types.CommitteeIndex(i))
rt, err := altair.SyncSelectionProofSigningRoot(hState, helpers.PrevSlot(hState.Slot()), types.CommitteeIndex(i))
assert.NoError(t, err)
sig := keys[idx].Sign(rt[:])
if altair.IsSyncCommitteeAggregator(sig.Marshal()) {

View File

@@ -11,7 +11,7 @@ go_library(
"deposits.go",
"helpers.go",
"state.go",
"sync_committee.go",
"sync_aggregate.go",
"wait_timeout.go",
],
importpath = "github.com/prysmaticlabs/prysm/shared/testutil",

View File

@@ -369,7 +369,7 @@ func GenerateFullBlockAltair(
slot = currentSlot + 1
}
syncAgg, err := generateSyncCommittees(bState, privs, parentRoot)
syncAgg, err := generateSyncAggregate(bState, privs, parentRoot)
if err != nil {
return nil, err
}

View File

@@ -14,7 +14,7 @@ import (
"github.com/prysmaticlabs/prysm/shared/params"
)
func generateSyncCommittees(bState iface.BeaconState, privs []bls.SecretKey, parentRoot [32]byte) (*prysmv2.SyncAggregate, error) {
func generateSyncAggregate(bState iface.BeaconState, privs []bls.SecretKey, parentRoot [32]byte) (*prysmv2.SyncAggregate, error) {
st, ok := bState.(iface.BeaconStateAltair)
if !ok || bState.Version() == version.Phase0 {
return nil, errors.Errorf("state cannot be asserted to altair state")