Add support for sync committee selections (#13633)

* add support for sync committee selections

* go mod tidy

* remove unused fields

* fix build

* fix build

---------

Co-authored-by: Radosław Kapka <rkapka@wp.pl>
This commit is contained in:
Dhruv Bodani
2024-02-23 19:23:42 +05:30
committed by GitHub
parent 789c3f8078
commit e100fb0c08
13 changed files with 368 additions and 12 deletions

View File

@@ -36,6 +36,7 @@ go_library(
"submit_signed_contribution_and_proof.go",
"subscribe_committee_subnets.go",
"sync_committee.go",
"sync_committee_selections.go",
],
importpath = "github.com/prysmaticlabs/prysm/v5/validator/client/beacon-api",
visibility = ["//validator:__subpackages__"],
@@ -107,6 +108,7 @@ go_test(
"submit_signed_aggregate_proof_test.go",
"submit_signed_contribution_and_proof_test.go",
"subscribe_committee_subnets_test.go",
"sync_committee_selections_test.go",
"sync_committee_test.go",
"validator_count_test.go",
"wait_for_chain_start_test.go",

View File

@@ -179,3 +179,7 @@ func (c *beaconApiValidatorClient) EventStreamIsRunning() bool {
func (c *beaconApiValidatorClient) GetAggregatedSelections(ctx context.Context, selections []iface.BeaconCommitteeSelection) ([]iface.BeaconCommitteeSelection, error) {
return c.getAggregatedSelection(ctx, selections)
}
func (c *beaconApiValidatorClient) GetAggregatedSyncSelections(ctx context.Context, selections []iface.SyncCommitteeSelection) ([]iface.SyncCommitteeSelection, error) {
return c.getAggregatedSyncSelections(ctx, selections)
}

View File

@@ -0,0 +1,35 @@
package beacon_api
import (
"bytes"
"context"
"encoding/json"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/validator/client/iface"
)
type aggregatedSyncSelectionResponse struct {
Data []iface.SyncCommitteeSelection `json:"data"`
}
func (c *beaconApiValidatorClient) getAggregatedSyncSelections(ctx context.Context, selections []iface.SyncCommitteeSelection) ([]iface.SyncCommitteeSelection, error) {
body, err := json.Marshal(selections)
if err != nil {
return nil, errors.Wrap(err, "failed to marshal selections")
}
var resp aggregatedSyncSelectionResponse
err = c.jsonRestHandler.Post(ctx, "/eth/v1/validator/sync_committee_selections", nil, bytes.NewBuffer(body), &resp)
if err != nil {
return nil, errors.Wrap(err, "error calling post endpoint")
}
if len(resp.Data) == 0 {
return nil, errors.New("no aggregated sync selections returned")
}
if len(selections) != len(resp.Data) {
return nil, errors.New("mismatching number of sync selections")
}
return resp.Data, nil
}

View File

@@ -0,0 +1,130 @@
package beacon_api
import (
"bytes"
"context"
"encoding/json"
"testing"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/testing/require"
"github.com/prysmaticlabs/prysm/v5/validator/client/beacon-api/mock"
test_helpers "github.com/prysmaticlabs/prysm/v5/validator/client/beacon-api/test-helpers"
"github.com/prysmaticlabs/prysm/v5/validator/client/iface"
"go.uber.org/mock/gomock"
)
func TestGetAggregatedSyncSelections(t *testing.T) {
testcases := []struct {
name string
req []iface.SyncCommitteeSelection
res []iface.SyncCommitteeSelection
endpointError error
expectedErrorMessage string
}{
{
name: "valid",
req: []iface.SyncCommitteeSelection{
{
SelectionProof: test_helpers.FillByteSlice(96, 82),
Slot: 75,
ValidatorIndex: 76,
SubcommitteeIndex: 77,
},
},
res: []iface.SyncCommitteeSelection{
{
SelectionProof: test_helpers.FillByteSlice(96, 100),
Slot: 75,
ValidatorIndex: 76,
SubcommitteeIndex: 77,
},
},
},
{
name: "endpoint error",
req: []iface.SyncCommitteeSelection{
{
SelectionProof: test_helpers.FillByteSlice(96, 82),
Slot: 75,
ValidatorIndex: 76,
SubcommitteeIndex: 77,
},
},
endpointError: errors.New("bad request"),
expectedErrorMessage: "bad request",
},
{
name: "no response error",
req: []iface.SyncCommitteeSelection{
{
SelectionProof: test_helpers.FillByteSlice(96, 82),
Slot: 75,
ValidatorIndex: 76,
SubcommitteeIndex: 77,
},
},
expectedErrorMessage: "no aggregated sync selections returned",
},
{
name: "mismatch response",
req: []iface.SyncCommitteeSelection{
{
SelectionProof: test_helpers.FillByteSlice(96, 82),
Slot: 75,
ValidatorIndex: 76,
SubcommitteeIndex: 77,
},
{
SelectionProof: test_helpers.FillByteSlice(96, 100),
Slot: 75,
ValidatorIndex: 76,
SubcommitteeIndex: 78,
},
},
res: []iface.SyncCommitteeSelection{
{
SelectionProof: test_helpers.FillByteSlice(96, 100),
Slot: 75,
ValidatorIndex: 76,
SubcommitteeIndex: 77,
},
},
expectedErrorMessage: "mismatching number of sync selections",
},
}
for _, test := range testcases {
t.Run(test.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
jsonRestHandler := mock.NewMockJsonRestHandler(ctrl)
reqBody, err := json.Marshal(test.req)
require.NoError(t, err)
ctx := context.Background()
jsonRestHandler.EXPECT().Post(
ctx,
"/eth/v1/validator/sync_committee_selections",
nil,
bytes.NewBuffer(reqBody),
&aggregatedSyncSelectionResponse{},
).SetArg(
4,
aggregatedSyncSelectionResponse{Data: test.res},
).Return(
test.endpointError,
).Times(1)
validatorClient := &beaconApiValidatorClient{jsonRestHandler: jsonRestHandler}
res, err := validatorClient.GetAggregatedSyncSelections(ctx, test.req)
if test.expectedErrorMessage != "" {
require.ErrorContains(t, test.expectedErrorMessage, err)
return
}
require.NoError(t, err)
require.DeepEqual(t, test.res, res)
})
}
}

View File

@@ -142,6 +142,10 @@ func (grpcValidatorClient) GetAggregatedSelections(context.Context, []iface.Beac
return nil, iface.ErrNotSupported
}
func (grpcValidatorClient) GetAggregatedSyncSelections(context.Context, []iface.SyncCommitteeSelection) ([]iface.SyncCommitteeSelection, error) {
return nil, iface.ErrNotSupported
}
func NewGrpcValidatorClient(cc grpc.ClientConnInterface) iface.ValidatorClient {
return &grpcValidatorClient{ethpb.NewBeaconNodeValidatorClient(cc)}
}

View File

@@ -62,6 +62,64 @@ func (b *BeaconCommitteeSelection) UnmarshalJSON(input []byte) error {
return nil
}
type SyncCommitteeSelection struct {
SelectionProof []byte
Slot primitives.Slot
SubcommitteeIndex primitives.CommitteeIndex
ValidatorIndex primitives.ValidatorIndex
}
type syncCommitteeSelectionJson struct {
SelectionProof string `json:"selection_proof"`
Slot string `json:"slot"`
SubcommitteeIndex string `json:"subcommittee_index"`
ValidatorIndex string `json:"validator_index"`
}
func (s SyncCommitteeSelection) MarshalJSON() ([]byte, error) {
return json.Marshal(syncCommitteeSelectionJson{
SelectionProof: hexutil.Encode(s.SelectionProof),
Slot: strconv.FormatUint(uint64(s.Slot), 10),
SubcommitteeIndex: strconv.FormatUint(uint64(s.SubcommitteeIndex), 10),
ValidatorIndex: strconv.FormatUint(uint64(s.ValidatorIndex), 10),
})
}
func (s *SyncCommitteeSelection) UnmarshalJSON(input []byte) error {
var resJson syncCommitteeSelectionJson
err := json.Unmarshal(input, &resJson)
if err != nil {
return errors.Wrap(err, "failed to unmarshal sync committee selection")
}
slot, err := strconv.ParseUint(resJson.Slot, 10, 64)
if err != nil {
return errors.Wrap(err, "failed to parse slot")
}
vIdx, err := strconv.ParseUint(resJson.ValidatorIndex, 10, 64)
if err != nil {
return errors.Wrap(err, "failed to parse validator index")
}
subcommIdx, err := strconv.ParseUint(resJson.SubcommitteeIndex, 10, 64)
if err != nil {
return errors.Wrap(err, "failed to parse subcommittee index")
}
selectionProof, err := hexutil.Decode(resJson.SelectionProof)
if err != nil {
return errors.Wrap(err, "failed to parse selection proof")
}
s.Slot = primitives.Slot(slot)
s.SelectionProof = selectionProof
s.ValidatorIndex = primitives.ValidatorIndex(vIdx)
s.SubcommitteeIndex = primitives.CommitteeIndex(subcommIdx)
return nil
}
type ValidatorClient interface {
GetDuties(ctx context.Context, in *ethpb.DutiesRequest) (*ethpb.DutiesResponse, error)
DomainData(ctx context.Context, in *ethpb.DomainRequest) (*ethpb.DomainResponse, error)
@@ -91,4 +149,5 @@ type ValidatorClient interface {
StartEventStream(ctx context.Context) error
EventStreamIsRunning() bool
GetAggregatedSelections(ctx context.Context, selections []BeaconCommitteeSelection) ([]BeaconCommitteeSelection, error)
GetAggregatedSyncSelections(ctx context.Context, selections []SyncCommitteeSelection) ([]SyncCommitteeSelection, error)
}

View File

@@ -6,6 +6,9 @@ import (
"sync/atomic"
"time"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/validator/client/iface"
"github.com/ethereum/go-ethereum/common/hexutil"
emptypb "github.com/golang/protobuf/ptypes/empty"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/altair"
@@ -117,7 +120,7 @@ func (v *validator) SubmitSignedContributionAndProof(ctx context.Context, slot p
return
}
selectionProofs, err := v.selectionProofs(ctx, slot, pubKey, indexRes)
selectionProofs, err := v.selectionProofs(ctx, slot, pubKey, indexRes, duty.ValidatorIndex)
if err != nil {
log.WithError(err).Error("Could not get selection proofs")
return
@@ -188,11 +191,12 @@ func (v *validator) SubmitSignedContributionAndProof(ctx context.Context, slot p
}
// Signs and returns selection proofs per validator for slot and pub key.
func (v *validator) selectionProofs(ctx context.Context, slot primitives.Slot, pubKey [fieldparams.BLSPubkeyLength]byte, indexRes *ethpb.SyncSubcommitteeIndexResponse) ([][]byte, error) {
func (v *validator) selectionProofs(ctx context.Context, slot primitives.Slot, pubKey [fieldparams.BLSPubkeyLength]byte, indexRes *ethpb.SyncSubcommitteeIndexResponse, validatorIndex primitives.ValidatorIndex) ([][]byte, error) {
selectionProofs := make([][]byte, len(indexRes.Indices))
cfg := params.BeaconConfig()
size := cfg.SyncCommitteeSize
subCount := cfg.SyncCommitteeSubnetCount
selections := make([]iface.SyncCommitteeSelection, len(indexRes.Indices))
for i, index := range indexRes.Indices {
subSize := size / subCount
subnet := uint64(index) / subSize
@@ -201,7 +205,27 @@ func (v *validator) selectionProofs(ctx context.Context, slot primitives.Slot, p
return nil, err
}
selectionProofs[i] = selectionProof
selections[i] = iface.SyncCommitteeSelection{
SelectionProof: selectionProof,
Slot: slot,
SubcommitteeIndex: primitives.CommitteeIndex(subnet),
ValidatorIndex: validatorIndex,
}
}
// Override selection proofs with aggregated ones if the node is part of a Distributed Validator.
if v.distributed && len(selections) > 0 {
var err error
selections, err := v.validatorClient.GetAggregatedSyncSelections(ctx, selections)
if err != nil {
return nil, errors.Wrap(err, "failed to get aggregated sync selections")
}
for i, s := range selections {
selectionProofs[i] = s.SelectionProof
}
}
return selectionProofs, nil
}

View File

@@ -753,7 +753,7 @@ func (v *validator) RolesAt(ctx context.Context, slot primitives.Slot) (map[[fie
}
}
if inSyncCommittee {
aggregator, err := v.isSyncCommitteeAggregator(ctx, slot, bytesutil.ToBytes48(duty.PublicKey))
aggregator, err := v.isSyncCommitteeAggregator(ctx, slot, bytesutil.ToBytes48(duty.PublicKey), duty.ValidatorIndex)
if err != nil {
return nil, errors.Wrap(err, "could not check if a validator is a sync committee aggregator")
}
@@ -818,7 +818,7 @@ func (v *validator) isAggregator(ctx context.Context, committee []primitives.Val
//
// modulo = max(1, SYNC_COMMITTEE_SIZE // SYNC_COMMITTEE_SUBNET_COUNT // TARGET_AGGREGATORS_PER_SYNC_SUBCOMMITTEE)
// return bytes_to_uint64(hash(signature)[0:8]) % modulo == 0
func (v *validator) isSyncCommitteeAggregator(ctx context.Context, slot primitives.Slot, pubKey [fieldparams.BLSPubkeyLength]byte) (bool, error) {
func (v *validator) isSyncCommitteeAggregator(ctx context.Context, slot primitives.Slot, pubKey [fieldparams.BLSPubkeyLength]byte, validatorIndex primitives.ValidatorIndex) (bool, error) {
res, err := v.validatorClient.GetSyncSubcommitteeIndex(ctx, &ethpb.SyncSubcommitteeIndexRequest{
PublicKey: pubKey[:],
Slot: slot,
@@ -827,6 +827,7 @@ func (v *validator) isSyncCommitteeAggregator(ctx context.Context, slot primitiv
return false, err
}
var selections []iface.SyncCommitteeSelection
for _, index := range res.Indices {
subCommitteeSize := params.BeaconConfig().SyncCommitteeSize / params.BeaconConfig().SyncCommitteeSubnetCount
subnet := uint64(index) / subCommitteeSize
@@ -834,7 +835,25 @@ func (v *validator) isSyncCommitteeAggregator(ctx context.Context, slot primitiv
if err != nil {
return false, err
}
isAggregator, err := altair.IsSyncCommitteeAggregator(sig)
selections = append(selections, iface.SyncCommitteeSelection{
SelectionProof: sig,
Slot: slot,
SubcommitteeIndex: primitives.CommitteeIndex(subnet),
ValidatorIndex: validatorIndex,
})
}
// Override selections with aggregated ones if the node is part of a Distributed Validator.
if v.distributed && len(selections) > 0 {
selections, err = v.validatorClient.GetAggregatedSyncSelections(ctx, selections)
if err != nil {
return false, errors.Wrap(err, "failed to get aggregated sync selections")
}
}
for _, s := range selections {
isAggregator, err := altair.IsSyncCommitteeAggregator(s.SelectionProof)
if err != nil {
return false, err
}

View File

@@ -1251,7 +1251,7 @@ func TestIsSyncCommitteeAggregator_OK(t *testing.T) {
},
).Return(&ethpb.SyncSubcommitteeIndexResponse{}, nil /*err*/)
aggregator, err := v.isSyncCommitteeAggregator(context.Background(), slot, bytesutil.ToBytes48(pubKey))
aggregator, err := v.isSyncCommitteeAggregator(context.Background(), slot, bytesutil.ToBytes48(pubKey), 0)
require.NoError(t, err)
require.Equal(t, false, aggregator)
@@ -1272,7 +1272,64 @@ func TestIsSyncCommitteeAggregator_OK(t *testing.T) {
},
).Return(&ethpb.SyncSubcommitteeIndexResponse{Indices: []primitives.CommitteeIndex{0}}, nil /*err*/)
aggregator, err = v.isSyncCommitteeAggregator(context.Background(), slot, bytesutil.ToBytes48(pubKey))
aggregator, err = v.isSyncCommitteeAggregator(context.Background(), slot, bytesutil.ToBytes48(pubKey), 0)
require.NoError(t, err)
require.Equal(t, true, aggregator)
}
func TestIsSyncCommitteeAggregator_Distributed_OK(t *testing.T) {
params.SetupTestConfigCleanup(t)
v, m, validatorKey, finish := setup(t)
defer finish()
v.distributed = true
slot := primitives.Slot(1)
pubKey := validatorKey.PublicKey().Marshal()
m.validatorClient.EXPECT().GetSyncSubcommitteeIndex(
gomock.Any(), // ctx
&ethpb.SyncSubcommitteeIndexRequest{
PublicKey: validatorKey.PublicKey().Marshal(),
Slot: 1,
},
).Return(&ethpb.SyncSubcommitteeIndexResponse{}, nil /*err*/)
aggregator, err := v.isSyncCommitteeAggregator(context.Background(), slot, bytesutil.ToBytes48(pubKey), 0)
require.NoError(t, err)
require.Equal(t, false, aggregator)
c := params.BeaconConfig().Copy()
c.TargetAggregatorsPerSyncSubcommittee = math.MaxUint64
params.OverrideBeaconConfig(c)
m.validatorClient.EXPECT().DomainData(
gomock.Any(), // ctx
gomock.Any(), // epoch
).Return(&ethpb.DomainResponse{SignatureDomain: make([]byte, 32)}, nil /*err*/).Times(2)
m.validatorClient.EXPECT().GetSyncSubcommitteeIndex(
gomock.Any(), // ctx
&ethpb.SyncSubcommitteeIndexRequest{
PublicKey: validatorKey.PublicKey().Marshal(),
Slot: 1,
},
).Return(&ethpb.SyncSubcommitteeIndexResponse{Indices: []primitives.CommitteeIndex{0}}, nil /*err*/)
sig, err := v.signSyncSelectionData(context.Background(), bytesutil.ToBytes48(pubKey), 0, slot)
require.NoError(t, err)
selection := iface.SyncCommitteeSelection{
SelectionProof: sig,
Slot: 1,
ValidatorIndex: 123,
SubcommitteeIndex: 0,
}
m.validatorClient.EXPECT().GetAggregatedSyncSelections(
gomock.Any(), // ctx
[]iface.SyncCommitteeSelection{selection},
).Return([]iface.SyncCommitteeSelection{selection}, nil)
aggregator, err = v.isSyncCommitteeAggregator(context.Background(), slot, bytesutil.ToBytes48(pubKey), 123)
require.NoError(t, err)
require.Equal(t, true, aggregator)
}