Implement beacon committee selections (#13503)

* implement beacon committee selections

* fix build

* fix lint

* fix lint

* Update beacon-chain/rpc/eth/shared/structs.go

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* Update validator/client/beacon-api/beacon_committee_selections.go

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* Update validator/client/beacon-api/beacon_committee_selections.go

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* Update validator/client/beacon-api/beacon_committee_selections.go

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* move beacon committee selection structs to validator module

* fix bazel build files

* add support for POST and GET endpoints for get state validators query

* add a handler to return error from beacon node

* move beacon committee selection to validator top-level module

* fix bazel

* re-arrange fields to fix lint

* fix TestServer_InitializeRoutes

* fix build and lint

* fix build and lint

* fix TestSubmitAggregateAndProof_Distributed

---------

Co-authored-by: Radosław Kapka <rkapka@wp.pl>
This commit is contained in:
Dhruv Bodani
2024-02-05 21:13:51 +05:30
committed by GitHub
parent e2e7e84a96
commit 55a29a4670
32 changed files with 925 additions and 108 deletions

View File

@@ -53,13 +53,25 @@ func (v *validator) SubmitAggregateAndProof(ctx context.Context, slot primitives
v.aggregatedSlotCommitteeIDCache.Add(k, true)
v.aggregatedSlotCommitteeIDCacheLock.Unlock()
slotSig, err := v.signSlotWithSelectionProof(ctx, pubKey, slot)
if err != nil {
log.WithError(err).Error("Could not sign slot")
if v.emitAccountMetrics {
ValidatorAggFailVec.WithLabelValues(fmtKey).Inc()
var slotSig []byte
if v.distributed {
slotSig, err = v.getAttSelection(attSelectionKey{slot: slot, index: duty.ValidatorIndex})
if err != nil {
log.WithError(err).Error("Could not find aggregated selection proof")
if v.emitAccountMetrics {
ValidatorAggFailVec.WithLabelValues(fmtKey).Inc()
}
return
}
} else {
slotSig, err = v.signSlotWithSelectionProof(ctx, pubKey, slot)
if err != nil {
log.WithError(err).Error("Could not sign slot")
if v.emitAccountMetrics {
ValidatorAggFailVec.WithLabelValues(fmtKey).Inc()
}
return
}
return
}
// As specified in spec, an aggregator should wait until two thirds of the way through slot

View File

@@ -5,6 +5,8 @@ import (
"errors"
"testing"
"github.com/prysmaticlabs/prysm/v4/validator/client/iface"
"github.com/golang/mock/gomock"
"github.com/prysmaticlabs/go-bitfield"
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
@@ -116,6 +118,63 @@ func TestSubmitAggregateAndProof_Ok(t *testing.T) {
validator.SubmitAggregateAndProof(context.Background(), 0, pubKey)
}
func TestSubmitAggregateAndProof_Distributed(t *testing.T) {
validatorIdx := primitives.ValidatorIndex(123)
slot := primitives.Slot(456)
ctx := context.Background()
validator, m, validatorKey, finish := setup(t)
defer finish()
var pubKey [fieldparams.BLSPubkeyLength]byte
copy(pubKey[:], validatorKey.PublicKey().Marshal())
validator.duties = &ethpb.DutiesResponse{
CurrentEpochDuties: []*ethpb.DutiesResponse_Duty{
{
PublicKey: validatorKey.PublicKey().Marshal(),
ValidatorIndex: validatorIdx,
AttesterSlot: slot,
},
},
}
validator.distributed = true
validator.attSelections = make(map[attSelectionKey]iface.BeaconCommitteeSelection)
validator.attSelections[attSelectionKey{
slot: slot,
index: 123,
}] = iface.BeaconCommitteeSelection{
SelectionProof: make([]byte, 96),
Slot: slot,
ValidatorIndex: validatorIdx,
}
m.validatorClient.EXPECT().SubmitAggregateSelectionProof(
gomock.Any(), // ctx
gomock.AssignableToTypeOf(&ethpb.AggregateSelectionRequest{}),
).Return(&ethpb.AggregateSelectionResponse{
AggregateAndProof: &ethpb.AggregateAttestationAndProof{
AggregatorIndex: 0,
Aggregate: util.HydrateAttestation(&ethpb.Attestation{
AggregationBits: make([]byte, 1),
}),
SelectionProof: make([]byte, 96),
},
}, nil)
m.validatorClient.EXPECT().DomainData(
gomock.Any(), // ctx
gomock.Any(), // epoch
).Return(&ethpb.DomainResponse{SignatureDomain: make([]byte, 32)}, nil /*err*/)
m.validatorClient.EXPECT().SubmitSignedAggregateSelectionProof(
gomock.Any(), // ctx
gomock.AssignableToTypeOf(&ethpb.SignedAggregateSubmitRequest{}),
).Return(&ethpb.SignedAggregateSubmitResponse{AttestationDataRoot: make([]byte, 32)}, nil)
validator.SubmitAggregateAndProof(ctx, slot, pubKey)
}
func TestWaitForSlotTwoThird_WaitCorrectly(t *testing.T) {
validator, _, _, finish := setup(t)
defer finish()

View File

@@ -12,6 +12,7 @@ go_library(
"beacon_block_converter.go",
"beacon_block_json_helpers.go",
"beacon_block_proto_helpers.go",
"beacon_committee_selections.go",
"domain_data.go",
"doppelganger.go",
"duties.go",
@@ -77,6 +78,7 @@ go_test(
"beacon_block_converter_test.go",
"beacon_block_json_helpers_test.go",
"beacon_block_proto_helpers_test.go",
"beacon_committee_selections_test.go",
"domain_data_test.go",
"doppelganger_test.go",
"duties_test.go",

View File

@@ -290,6 +290,14 @@ func TestActivation_JsonResponseError(t *testing.T) {
errors.New("some specific json error"),
).Times(1)
jsonRestHandler.EXPECT().Get(
ctx,
gomock.Any(),
gomock.Any(),
).Return(
errors.New("some specific json error"),
).Times(1)
validatorClient := beaconApiValidatorClient{
stateValidatorsProvider: beaconApiStateValidatorsProvider{
jsonRestHandler: jsonRestHandler,

View File

@@ -175,3 +175,7 @@ func (c *beaconApiValidatorClient) StartEventStream(ctx context.Context) error {
func (c *beaconApiValidatorClient) EventStreamIsRunning() bool {
return c.eventHandler.running
}
func (c *beaconApiValidatorClient) GetAggregatedSelections(ctx context.Context, selections []iface.BeaconCommitteeSelection) ([]iface.BeaconCommitteeSelection, error) {
return c.getAggregatedSelection(ctx, selections)
}

View File

@@ -0,0 +1,36 @@
package beacon_api
import (
"bytes"
"context"
"encoding/json"
"github.com/prysmaticlabs/prysm/v4/validator/client/iface"
"github.com/pkg/errors"
)
type aggregatedSelectionResponse struct {
Data []iface.BeaconCommitteeSelection `json:"data"`
}
func (c *beaconApiValidatorClient) getAggregatedSelection(ctx context.Context, selections []iface.BeaconCommitteeSelection) ([]iface.BeaconCommitteeSelection, error) {
body, err := json.Marshal(selections)
if err != nil {
return nil, errors.Wrap(err, "failed to marshal selections")
}
var resp aggregatedSelectionResponse
err = c.jsonRestHandler.Post(ctx, "/eth/v1/validator/beacon_committee_selections", nil, bytes.NewBuffer(body), &resp)
if err != nil {
return nil, errors.Wrap(err, "error calling post endpoint")
}
if len(resp.Data) == 0 {
return nil, errors.New("no aggregated selection returned")
}
if len(selections) != len(resp.Data) {
return nil, errors.New("mismatching number of selections")
}
return resp.Data, nil
}

View File

@@ -0,0 +1,124 @@
package beacon_api
import (
"bytes"
"context"
"encoding/json"
"testing"
"github.com/prysmaticlabs/prysm/v4/validator/client/iface"
"github.com/golang/mock/gomock"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v4/testing/require"
"github.com/prysmaticlabs/prysm/v4/validator/client/beacon-api/mock"
test_helpers "github.com/prysmaticlabs/prysm/v4/validator/client/beacon-api/test-helpers"
)
func TestGetAggregatedSelections(t *testing.T) {
testcases := []struct {
name string
req []iface.BeaconCommitteeSelection
res []iface.BeaconCommitteeSelection
endpointError error
expectedErrorMessage string
}{
{
name: "valid",
req: []iface.BeaconCommitteeSelection{
{
SelectionProof: test_helpers.FillByteSlice(96, 82),
Slot: 75,
ValidatorIndex: 76,
},
},
res: []iface.BeaconCommitteeSelection{
{
SelectionProof: test_helpers.FillByteSlice(96, 100),
Slot: 75,
ValidatorIndex: 76,
},
},
},
{
name: "endpoint error",
req: []iface.BeaconCommitteeSelection{
{
SelectionProof: test_helpers.FillByteSlice(96, 82),
Slot: 75,
ValidatorIndex: 76,
},
},
endpointError: errors.New("bad request"),
expectedErrorMessage: "bad request",
},
{
name: "no response error",
req: []iface.BeaconCommitteeSelection{
{
SelectionProof: test_helpers.FillByteSlice(96, 82),
Slot: 75,
ValidatorIndex: 76,
},
},
expectedErrorMessage: "no aggregated selection returned",
},
{
name: "mismatch response",
req: []iface.BeaconCommitteeSelection{
{
SelectionProof: test_helpers.FillByteSlice(96, 82),
Slot: 75,
ValidatorIndex: 76,
},
{
SelectionProof: test_helpers.FillByteSlice(96, 102),
Slot: 75,
ValidatorIndex: 79,
},
},
res: []iface.BeaconCommitteeSelection{
{
SelectionProof: test_helpers.FillByteSlice(96, 100),
Slot: 75,
ValidatorIndex: 76,
},
},
expectedErrorMessage: "mismatching number of selections",
},
}
for _, test := range testcases {
t.Run(test.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
jsonRestHandler := mock.NewMockJsonRestHandler(ctrl)
reqBody, err := json.Marshal(test.req)
require.NoError(t, err)
ctx := context.Background()
jsonRestHandler.EXPECT().Post(
ctx,
"/eth/v1/validator/beacon_committee_selections",
nil,
bytes.NewBuffer(reqBody),
&aggregatedSelectionResponse{},
).SetArg(
4,
aggregatedSelectionResponse{Data: test.res},
).Return(
test.endpointError,
).Times(1)
validatorClient := &beaconApiValidatorClient{jsonRestHandler: jsonRestHandler}
res, err := validatorClient.GetAggregatedSelections(ctx, test.req)
if test.expectedErrorMessage != "" {
require.ErrorContains(t, test.expectedErrorMessage, err)
return
}
require.NoError(t, err)
require.DeepEqual(t, test.res, res)
})
}
}

View File

@@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/json"
"net/url"
"testing"
"github.com/ethereum/go-ethereum/common/hexutil"
@@ -26,6 +27,7 @@ func getPubKeyAndReqBuffer(t *testing.T) ([]byte, *bytes.Buffer) {
Ids: []string{stringPubKey},
Statuses: []string{},
}
reqBytes, err := json.Marshal(req)
require.NoError(t, err)
return pubKey, bytes.NewBuffer(reqBytes)
@@ -192,6 +194,27 @@ func TestIndex_JsonResponseError(t *testing.T) {
errors.New("some specific json error"),
).Times(1)
req := structs.GetValidatorsRequest{
Ids: []string{stringPubKey},
Statuses: []string{},
}
queryParams := url.Values{}
for _, id := range req.Ids {
queryParams.Add("id", id)
}
for _, st := range req.Statuses {
queryParams.Add("status", st)
}
jsonRestHandler.EXPECT().Get(
ctx,
buildURL("/eth/v1/beacon/states/head/validators", queryParams),
&stateValidatorsResponseJson,
).Return(
errors.New("some specific json error"),
).Times(1)
validatorClient := beaconApiValidatorClient{
stateValidatorsProvider: beaconApiStateValidatorsProvider{
jsonRestHandler: jsonRestHandler,

View File

@@ -8,7 +8,7 @@ import (
reflect "reflect"
gomock "github.com/golang/mock/gomock"
"github.com/prysmaticlabs/prysm/v4/api/server/structs"
structs "github.com/prysmaticlabs/prysm/v4/api/server/structs"
eth "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
)

View File

@@ -9,7 +9,7 @@ import (
reflect "reflect"
gomock "github.com/golang/mock/gomock"
"github.com/prysmaticlabs/prysm/v4/api/server/structs"
structs "github.com/prysmaticlabs/prysm/v4/api/server/structs"
primitives "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
)

View File

@@ -1,5 +1,5 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: github.com/prysmaticlabs/prysm/v4/validator/client/beacon-api (interfaces: GenesisProvider)
// Source: validator/client/beacon-api/genesis.go
// Package mock is a generated GoMock package.
package mock
@@ -9,7 +9,7 @@ import (
reflect "reflect"
gomock "github.com/golang/mock/gomock"
"github.com/prysmaticlabs/prysm/v4/api/server/structs"
structs "github.com/prysmaticlabs/prysm/v4/api/server/structs"
)
// MockGenesisProvider is a mock of GenesisProvider interface.
@@ -36,16 +36,16 @@ func (m *MockGenesisProvider) EXPECT() *MockGenesisProviderMockRecorder {
}
// GetGenesis mocks base method.
func (m *MockGenesisProvider) GetGenesis(arg0 context.Context) (*structs.Genesis, error) {
func (m *MockGenesisProvider) GetGenesis(ctx context.Context) (*structs.Genesis, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetGenesis", arg0)
ret := m.ctrl.Call(m, "GetGenesis", ctx)
ret0, _ := ret[0].(*structs.Genesis)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// GetGenesis indicates an expected call of GetGenesis.
func (mr *MockGenesisProviderMockRecorder) GetGenesis(arg0 interface{}) *gomock.Call {
func (mr *MockGenesisProviderMockRecorder) GetGenesis(ctx interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetGenesis", reflect.TypeOf((*MockGenesisProvider)(nil).GetGenesis), arg0)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetGenesis", reflect.TypeOf((*MockGenesisProvider)(nil).GetGenesis), ctx)
}

View File

@@ -1,5 +1,5 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: github.com/prysmaticlabs/prysm/v4/validator/client/beacon-api (interfaces: JsonRestHandler)
// Source: validator/client/beacon-api/json_rest_handler.go
// Package mock is a generated GoMock package.
package mock
@@ -36,29 +36,29 @@ func (m *MockJsonRestHandler) EXPECT() *MockJsonRestHandlerMockRecorder {
}
// Get mocks base method.
func (m *MockJsonRestHandler) Get(arg0 context.Context, arg1 string, arg2 interface{}) error {
func (m *MockJsonRestHandler) Get(ctx context.Context, endpoint string, resp interface{}) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Get", arg0, arg1, arg2)
ret := m.ctrl.Call(m, "Get", ctx, endpoint, resp)
ret0, _ := ret[0].(error)
return ret0
}
// Get indicates an expected call of Get.
func (mr *MockJsonRestHandlerMockRecorder) Get(arg0, arg1, arg2 interface{}) *gomock.Call {
func (mr *MockJsonRestHandlerMockRecorder) Get(ctx, endpoint, resp interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockJsonRestHandler)(nil).Get), arg0, arg1, arg2)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockJsonRestHandler)(nil).Get), ctx, endpoint, resp)
}
// Post mocks base method.
func (m *MockJsonRestHandler) Post(arg0 context.Context, arg1 string, arg2 map[string]string, arg3 *bytes.Buffer, arg4 interface{}) error {
func (m *MockJsonRestHandler) Post(ctx context.Context, endpoint string, headers map[string]string, data *bytes.Buffer, resp interface{}) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Post", arg0, arg1, arg2, arg3, arg4)
ret := m.ctrl.Call(m, "Post", ctx, endpoint, headers, data, resp)
ret0, _ := ret[0].(error)
return ret0
}
// Post indicates an expected call of Post.
func (mr *MockJsonRestHandlerMockRecorder) Post(arg0, arg1, arg2, arg3, arg4 interface{}) *gomock.Call {
func (mr *MockJsonRestHandlerMockRecorder) Post(ctx, endpoint, headers, data, resp interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Post", reflect.TypeOf((*MockJsonRestHandler)(nil).Post), arg0, arg1, arg2, arg3, arg4)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Post", reflect.TypeOf((*MockJsonRestHandler)(nil).Post), ctx, endpoint, headers, data, resp)
}

View File

@@ -9,7 +9,7 @@ import (
reflect "reflect"
gomock "github.com/golang/mock/gomock"
"github.com/prysmaticlabs/prysm/v4/api/server/structs"
structs "github.com/prysmaticlabs/prysm/v4/api/server/structs"
primitives "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
)

View File

@@ -5,6 +5,7 @@ import (
"context"
"encoding/json"
"fmt"
"net/url"
"strconv"
"github.com/pkg/errors"
@@ -91,7 +92,31 @@ func (c beaconApiStateValidatorsProvider) getStateValidatorsHelper(
return nil, errors.Wrapf(err, "failed to marshal request into JSON")
}
stateValidatorsJson := &structs.GetValidatorsResponse{}
if err = c.jsonRestHandler.Post(ctx, endpoint, nil, bytes.NewBuffer(reqBytes), stateValidatorsJson); err != nil {
// First try POST endpoint to check whether it is supported by the beacon node.
if err = c.jsonRestHandler.Post(ctx, endpoint, nil, bytes.NewBuffer(reqBytes), stateValidatorsJson); err == nil {
if stateValidatorsJson.Data == nil {
return nil, errors.New("stateValidatorsJson.Data is nil")
}
return stateValidatorsJson, nil
}
// Re-initialise the response just in case.
stateValidatorsJson = &structs.GetValidatorsResponse{}
// Seems like POST isn't supported by the beacon node, let's try the GET one.
queryParams := url.Values{}
for _, id := range req.Ids {
queryParams.Add("id", id)
}
for _, st := range req.Statuses {
queryParams.Add("status", st)
}
query := buildURL(endpoint, queryParams)
err = c.jsonRestHandler.Get(ctx, query, stateValidatorsJson)
if err != nil {
return nil, err
}

View File

@@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/json"
"net/url"
"testing"
"github.com/golang/mock/gomock"
@@ -15,7 +16,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/validator/client/beacon-api/mock"
)
func TestGetStateValidators_Nominal(t *testing.T) {
func TestGetStateValidators_Nominal_POST(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
@@ -101,6 +102,112 @@ func TestGetStateValidators_Nominal(t *testing.T) {
assert.DeepEqual(t, wanted, actual.Data)
}
func TestGetStateValidators_Nominal_GET(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
req := &structs.GetValidatorsRequest{
Ids: []string{
"12345",
"0x8000091c2ae64ee414a54c1cc1fc67dec663408bc636cb86756e0200e41a75c8f86603f104f02c856983d2783116be13",
"0x80000e851c0f53c3246ff726d7ff7766661ca5e12a07c45c114d208d54f0f8233d4380b2e9aff759d69795d1df905526",
"0x424242424242424242424242424242424242424242424242424242424242424242424242424242424242424242424242",
"0x800015473bdc3a7f45ef8eb8abc598bc20021e55ad6e6ad1d745aaef9730dd2c28ec08bf42df18451de94dd4a6d24ec5",
},
Statuses: []string{"active_ongoing", "active_exiting", "exited_slashed", "exited_unslashed"},
}
reqBytes, err := json.Marshal(req)
require.NoError(t, err)
stateValidatorsResponseJson := structs.GetValidatorsResponse{}
jsonRestHandler := mock.NewMockJsonRestHandler(ctrl)
wanted := []*structs.ValidatorContainer{
{
Index: "12345",
Status: "active_ongoing",
Validator: &structs.Validator{
Pubkey: "0x8000091c2ae64ee414a54c1cc1fc67dec663408bc636cb86756e0200e41a75c8f86603f104f02c856983d2783116be19",
},
},
{
Index: "55293",
Status: "active_ongoing",
Validator: &structs.Validator{
Pubkey: "0x8000091c2ae64ee414a54c1cc1fc67dec663408bc636cb86756e0200e41a75c8f86603f104f02c856983d2783116be13",
},
},
{
Index: "55294",
Status: "active_exiting",
Validator: &structs.Validator{
Pubkey: "0x80000e851c0f53c3246ff726d7ff7766661ca5e12a07c45c114d208d54f0f8233d4380b2e9aff759d69795d1df905526",
},
},
{
Index: "55295",
Status: "exited_slashed",
Validator: &structs.Validator{
Pubkey: "0x800015473bdc3a7f45ef8eb8abc598bc20021e55ad6e6ad1d745aaef9730dd2c28ec08bf42df18451de94dd4a6d24ec5",
},
},
}
ctx := context.Background()
// First return an error from POST call.
jsonRestHandler.EXPECT().Post(
ctx,
"/eth/v1/beacon/states/head/validators",
nil,
bytes.NewBuffer(reqBytes),
&stateValidatorsResponseJson,
).Return(
errors.New("an error"),
).Times(1)
// Then try the GET call which will be successful.
queryParams := url.Values{}
for _, id := range req.Ids {
queryParams.Add("id", id)
}
for _, st := range req.Statuses {
queryParams.Add("status", st)
}
query := buildURL("/eth/v1/beacon/states/head/validators", queryParams)
jsonRestHandler.EXPECT().Get(
ctx,
query,
&stateValidatorsResponseJson,
).Return(
nil,
).SetArg(
2,
structs.GetValidatorsResponse{
Data: wanted,
},
).Times(1)
stateValidatorsProvider := beaconApiStateValidatorsProvider{jsonRestHandler: jsonRestHandler}
actual, err := stateValidatorsProvider.GetStateValidators(ctx, []string{
"0x8000091c2ae64ee414a54c1cc1fc67dec663408bc636cb86756e0200e41a75c8f86603f104f02c856983d2783116be13", // active_ongoing
"0x80000e851c0f53c3246ff726d7ff7766661ca5e12a07c45c114d208d54f0f8233d4380b2e9aff759d69795d1df905526", // active_exiting
"0x424242424242424242424242424242424242424242424242424242424242424242424242424242424242424242424242", // does not exist
"0x8000091c2ae64ee414a54c1cc1fc67dec663408bc636cb86756e0200e41a75c8f86603f104f02c856983d2783116be13", // active_ongoing - duplicate
"0x800015473bdc3a7f45ef8eb8abc598bc20021e55ad6e6ad1d745aaef9730dd2c28ec08bf42df18451de94dd4a6d24ec5", // exited_slashed
},
[]primitives.ValidatorIndex{
12345, // active_ongoing
12345, // active_ongoing - duplicate
},
[]string{"active_ongoing", "active_exiting", "exited_slashed", "exited_unslashed"},
)
require.NoError(t, err)
assert.DeepEqual(t, wanted, actual.Data)
}
func TestGetStateValidators_GetRestJsonResponseOnError(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
@@ -117,6 +224,7 @@ func TestGetStateValidators_GetRestJsonResponseOnError(t *testing.T) {
ctx := context.Background()
// First call POST.
jsonRestHandler.EXPECT().Post(
ctx,
"/eth/v1/beacon/states/head/validators",
@@ -127,6 +235,25 @@ func TestGetStateValidators_GetRestJsonResponseOnError(t *testing.T) {
errors.New("an error"),
).Times(1)
// Call to GET endpoint upon receiving error from POST call.
queryParams := url.Values{}
for _, id := range req.Ids {
queryParams.Add("id", id)
}
for _, st := range req.Statuses {
queryParams.Add("status", st)
}
query := buildURL("/eth/v1/beacon/states/head/validators", queryParams)
jsonRestHandler.EXPECT().Get(
ctx,
query,
&stateValidatorsResponseJson,
).Return(
errors.New("an error"),
).Times(1)
stateValidatorsProvider := beaconApiStateValidatorsProvider{jsonRestHandler: jsonRestHandler}
_, err = stateValidatorsProvider.GetStateValidators(ctx, []string{
"0x8000091c2ae64ee414a54c1cc1fc67dec663408bc636cb86756e0200e41a75c8f86603f104f02c856983d2783116be13", // active_ongoing
@@ -137,7 +264,7 @@ func TestGetStateValidators_GetRestJsonResponseOnError(t *testing.T) {
assert.ErrorContains(t, "an error", err)
}
func TestGetStateValidators_DataIsNil(t *testing.T) {
func TestGetStateValidators_DataIsNil_POST(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
@@ -175,3 +302,63 @@ func TestGetStateValidators_DataIsNil(t *testing.T) {
)
assert.ErrorContains(t, "stateValidatorsJson.Data is nil", err)
}
func TestGetStateValidators_DataIsNil_GET(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
req := &structs.GetValidatorsRequest{
Ids: []string{"0x8000091c2ae64ee414a54c1cc1fc67dec663408bc636cb86756e0200e41a75c8f86603f104f02c856983d2783116be13"},
Statuses: []string{},
}
reqBytes, err := json.Marshal(req)
require.NoError(t, err)
ctx := context.Background()
stateValidatorsResponseJson := structs.GetValidatorsResponse{}
jsonRestHandler := mock.NewMockJsonRestHandler(ctrl)
// First call POST which will return an error.
jsonRestHandler.EXPECT().Post(
ctx,
"/eth/v1/beacon/states/head/validators",
nil,
bytes.NewBuffer(reqBytes),
&stateValidatorsResponseJson,
).Return(
errors.New("an error"),
).Times(1)
// Then call GET which returns nil Data.
queryParams := url.Values{}
for _, id := range req.Ids {
queryParams.Add("id", id)
}
for _, st := range req.Statuses {
queryParams.Add("status", st)
}
query := buildURL("/eth/v1/beacon/states/head/validators", queryParams)
jsonRestHandler.EXPECT().Get(
ctx,
query,
&stateValidatorsResponseJson,
).Return(
nil,
).SetArg(
2,
structs.GetValidatorsResponse{
Data: nil,
},
).Times(1)
stateValidatorsProvider := beaconApiStateValidatorsProvider{jsonRestHandler: jsonRestHandler}
_, err = stateValidatorsProvider.GetStateValidators(ctx, []string{
"0x8000091c2ae64ee414a54c1cc1fc67dec663408bc636cb86756e0200e41a75c8f86603f104f02c856983d2783116be13", // active_ongoing
},
nil,
nil,
)
assert.ErrorContains(t, "stateValidatorsJson.Data is nil", err)
}

View File

@@ -6,6 +6,7 @@ import (
"encoding/json"
"errors"
"fmt"
"net/url"
"testing"
"github.com/ethereum/go-ethereum/common/hexutil"
@@ -201,6 +202,27 @@ func TestSubmitAggregateSelectionProof(t *testing.T) {
test.validatorsErr,
).Times(test.validatorsCalled)
if test.validatorsErr != nil {
// Then try the GET call which will also return error.
queryParams := url.Values{}
for _, id := range valsReq.Ids {
queryParams.Add("id", id)
}
for _, st := range valsReq.Statuses {
queryParams.Add("status", st)
}
query := buildURL("/eth/v1/beacon/states/head/validators", queryParams)
jsonRestHandler.EXPECT().Get(
ctx,
query,
&structs.GetValidatorsResponse{},
).Return(
test.validatorsErr,
).Times(1)
}
// Call attester duties endpoint to get attester duties.
validatorIndicesBytes, err := json.Marshal([]string{validatorIndex})
require.NoError(t, err)

View File

@@ -5,6 +5,7 @@ import (
"context"
"encoding/json"
"fmt"
"net/url"
"testing"
"github.com/ethereum/go-ethereum/common/hexutil"
@@ -337,6 +338,27 @@ func TestGetSyncSubCommitteeIndex(t *testing.T) {
test.validatorsErr,
).Times(1)
if test.validatorsErr != nil {
// Then try the GET call which will also return error.
queryParams := url.Values{}
for _, id := range valsReq.Ids {
queryParams.Add("id", id)
}
for _, st := range valsReq.Statuses {
queryParams.Add("status", st)
}
query := buildURL("/eth/v1/beacon/states/head/validators", queryParams)
jsonRestHandler.EXPECT().Get(
ctx,
query,
&structs.GetValidatorsResponse{},
).Return(
test.validatorsErr,
).Times(1)
}
validatorIndicesBytes, err := json.Marshal([]string{validatorIndex})
require.NoError(t, err)

View File

@@ -138,6 +138,10 @@ func (c *grpcValidatorClient) AggregatedSigAndAggregationBits(
return c.beaconNodeValidatorClient.AggregatedSigAndAggregationBits(ctx, in)
}
func (grpcValidatorClient) GetAggregatedSelections(context.Context, []iface.BeaconCommitteeSelection) ([]iface.BeaconCommitteeSelection, error) {
return nil, iface.ErrNotSupported
}
func NewGrpcValidatorClient(cc grpc.ClientConnInterface) iface.ValidatorClient {
return &grpcValidatorClient{ethpb.NewBeaconNodeValidatorClient(cc)}
}

View File

@@ -20,6 +20,7 @@ go_library(
"//proto/prysm/v1alpha1:go_default_library",
"//proto/prysm/v1alpha1/validator-client:go_default_library",
"//validator/keymanager:go_default_library",
"@com_github_ethereum_go_ethereum//common/hexutil:go_default_library",
"@com_github_golang_protobuf//ptypes/empty",
"@com_github_pkg_errors//:go_default_library",
],

View File

@@ -2,12 +2,66 @@ package iface
import (
"context"
"encoding/json"
"strconv"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/pkg/errors"
"github.com/golang/protobuf/ptypes/empty"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
)
type BeaconCommitteeSelection struct {
SelectionProof []byte
Slot primitives.Slot
ValidatorIndex primitives.ValidatorIndex
}
type beaconCommitteeSelectionJson struct {
SelectionProof string `json:"selection_proof"`
Slot string `json:"slot"`
ValidatorIndex string `json:"validator_index"`
}
func (b BeaconCommitteeSelection) MarshalJSON() ([]byte, error) {
return json.Marshal(beaconCommitteeSelectionJson{
SelectionProof: hexutil.Encode(b.SelectionProof),
Slot: strconv.FormatUint(uint64(b.Slot), 10),
ValidatorIndex: strconv.FormatUint(uint64(b.ValidatorIndex), 10),
})
}
func (b *BeaconCommitteeSelection) UnmarshalJSON(input []byte) error {
var bjson beaconCommitteeSelectionJson
err := json.Unmarshal(input, &bjson)
if err != nil {
return errors.Wrap(err, "failed to unmarshal beacon committee selection")
}
slot, err := strconv.ParseUint(bjson.Slot, 10, 64)
if err != nil {
return errors.Wrap(err, "failed to parse slot")
}
vIdx, err := strconv.ParseUint(bjson.ValidatorIndex, 10, 64)
if err != nil {
return errors.Wrap(err, "failed to parse validator index")
}
selectionProof, err := hexutil.Decode(bjson.SelectionProof)
if err != nil {
return errors.Wrap(err, "failed to parse selection proof")
}
b.Slot = primitives.Slot(slot)
b.SelectionProof = selectionProof
b.ValidatorIndex = primitives.ValidatorIndex(vIdx)
return nil
}
type ValidatorClient interface {
GetDuties(ctx context.Context, in *ethpb.DutiesRequest) (*ethpb.DutiesResponse, error)
DomainData(ctx context.Context, in *ethpb.DomainRequest) (*ethpb.DomainResponse, error)
@@ -36,4 +90,5 @@ type ValidatorClient interface {
SubmitValidatorRegistrations(ctx context.Context, in *ethpb.SignedValidatorRegistrationsV1) (*empty.Empty, error)
StartEventStream(ctx context.Context) error
EventStreamIsRunning() bool
GetAggregatedSelections(ctx context.Context, selections []BeaconCommitteeSelection) ([]BeaconCommitteeSelection, error)
}

View File

@@ -56,6 +56,7 @@ type ValidatorService struct {
useWeb bool
emitAccountMetrics bool
logValidatorBalances bool
distributed bool
interopKeysConfig *local.InteropKeymanagerConfig
conn validatorHelpers.NodeConnection
grpcRetryDelay time.Duration
@@ -83,6 +84,7 @@ type Config struct {
UseWeb bool
LogValidatorBalances bool
EmitAccountMetrics bool
Distributed bool
InteropKeysConfig *local.InteropKeymanagerConfig
Wallet *wallet.Wallet
WalletInitializedFeed *event.Feed
@@ -131,6 +133,7 @@ func NewValidatorService(ctx context.Context, cfg *Config) (*ValidatorService, e
Web3SignerConfig: cfg.Web3SignerConfig,
proposerSettings: cfg.ProposerSettings,
validatorsRegBatchSize: cfg.ValidatorsRegBatchSize,
distributed: cfg.Distributed,
}
dialOpts := ConstructDialOptions(
@@ -230,6 +233,8 @@ func (v *ValidatorService) Start() {
proposerSettings: v.proposerSettings,
walletInitializedChannel: make(chan *wallet.Wallet, 1),
validatorsRegBatchSize: v.validatorsRegBatchSize,
distributed: v.distributed,
attSelections: make(map[attSelectionKey]iface.BeaconCommitteeSelection),
}
v.validator = valStruct

View File

@@ -67,12 +67,14 @@ type validator struct {
logValidatorBalances bool
useWeb bool
emitAccountMetrics bool
distributed bool
domainDataLock sync.RWMutex
attLogsLock sync.Mutex
aggregatedSlotCommitteeIDCacheLock sync.Mutex
highestValidSlotLock sync.Mutex
prevBalanceLock sync.RWMutex
slashableKeysLock sync.RWMutex
attSelectionLock sync.Mutex
eipImportBlacklistedPublicKeys map[[fieldparams.BLSPubkeyLength]byte]bool
walletInitializedFeed *event.Feed
attLogs map[[32]byte]*attSubmitted
@@ -82,6 +84,7 @@ type validator struct {
prevBalance map[[fieldparams.BLSPubkeyLength]byte]uint64
pubkeyToValidatorIndex map[[fieldparams.BLSPubkeyLength]byte]primitives.ValidatorIndex
signedValidatorRegistrations map[[fieldparams.BLSPubkeyLength]byte]*ethpb.SignedValidatorRegistrationV1
attSelections map[attSelectionKey]iface.BeaconCommitteeSelection
graffitiOrderedIndex uint64
aggregatedSlotCommitteeIDCache *lru.Cache
domainDataCache *ristretto.Cache
@@ -113,6 +116,11 @@ type validatorStatus struct {
index primitives.ValidatorIndex
}
type attSelectionKey struct {
slot primitives.Slot
index primitives.ValidatorIndex
}
// Done cleans up the validator.
func (v *validator) Done() {
v.ticker.Done()
@@ -629,6 +637,13 @@ func (v *validator) subscribeToSubnets(ctx context.Context, res *ethpb.DutiesRes
subscribeValidatorIndices := make([]primitives.ValidatorIndex, 0, len(res.CurrentEpochDuties)+len(res.NextEpochDuties))
alreadySubscribed := make(map[[64]byte]bool)
if v.distributed {
// Get aggregated selection proofs to calculate isAggregator.
if err := v.getAggregatedSelectionProofs(ctx, res); err != nil {
return errors.Wrap(err, "could not get aggregated selection proofs")
}
}
for _, duty := range res.CurrentEpochDuties {
pk := bytesutil.ToBytes48(duty.PublicKey)
if duty.Status == ethpb.ValidatorStatus_ACTIVE || duty.Status == ethpb.ValidatorStatus_EXITING {
@@ -641,7 +656,7 @@ func (v *validator) subscribeToSubnets(ctx context.Context, res *ethpb.DutiesRes
continue
}
aggregator, err := v.isAggregator(ctx, duty.Committee, attesterSlot, pk)
aggregator, err := v.isAggregator(ctx, duty.Committee, attesterSlot, pk, validatorIndex)
if err != nil {
return errors.Wrap(err, "could not check if a validator is an aggregator")
}
@@ -667,7 +682,7 @@ func (v *validator) subscribeToSubnets(ctx context.Context, res *ethpb.DutiesRes
continue
}
aggregator, err := v.isAggregator(ctx, duty.Committee, attesterSlot, bytesutil.ToBytes48(duty.PublicKey))
aggregator, err := v.isAggregator(ctx, duty.Committee, attesterSlot, bytesutil.ToBytes48(duty.PublicKey), validatorIndex)
if err != nil {
return errors.Wrap(err, "could not check if a validator is an aggregator")
}
@@ -718,7 +733,7 @@ func (v *validator) RolesAt(ctx context.Context, slot primitives.Slot) (map[[fie
if duty.AttesterSlot == slot {
roles = append(roles, iface.RoleAttester)
aggregator, err := v.isAggregator(ctx, duty.Committee, slot, bytesutil.ToBytes48(duty.PublicKey))
aggregator, err := v.isAggregator(ctx, duty.Committee, slot, bytesutil.ToBytes48(duty.PublicKey), duty.ValidatorIndex)
if err != nil {
return nil, errors.Wrap(err, "could not check if a validator is an aggregator")
}
@@ -773,15 +788,26 @@ func (v *validator) Keymanager() (keymanager.IKeymanager, error) {
// isAggregator checks if a validator is an aggregator of a given slot and committee,
// it uses a modulo calculated by validator count in committee and samples randomness around it.
func (v *validator) isAggregator(ctx context.Context, committee []primitives.ValidatorIndex, slot primitives.Slot, pubKey [fieldparams.BLSPubkeyLength]byte) (bool, error) {
func (v *validator) isAggregator(ctx context.Context, committee []primitives.ValidatorIndex, slot primitives.Slot, pubKey [fieldparams.BLSPubkeyLength]byte, validatorIndex primitives.ValidatorIndex) (bool, error) {
modulo := uint64(1)
if len(committee)/int(params.BeaconConfig().TargetAggregatorsPerCommittee) > 1 {
modulo = uint64(len(committee)) / params.BeaconConfig().TargetAggregatorsPerCommittee
}
slotSig, err := v.signSlotWithSelectionProof(ctx, pubKey, slot)
if err != nil {
return false, err
var (
slotSig []byte
err error
)
if v.distributed {
slotSig, err = v.getAttSelection(attSelectionKey{slot: slot, index: validatorIndex})
if err != nil {
return false, err
}
} else {
slotSig, err = v.signSlotWithSelectionProof(ctx, pubKey, slot)
if err != nil {
return false, err
}
}
b := hash.Hash(slotSig)
@@ -1230,6 +1256,89 @@ func (v *validator) validatorIndex(ctx context.Context, pubkey [fieldparams.BLSP
return resp.Index, true, nil
}
func (v *validator) getAggregatedSelectionProofs(ctx context.Context, duties *ethpb.DutiesResponse) error {
// Create new instance of attestation selections map.
v.newAttSelections()
var req []iface.BeaconCommitteeSelection
for _, duty := range duties.CurrentEpochDuties {
if duty.Status != ethpb.ValidatorStatus_ACTIVE && duty.Status != ethpb.ValidatorStatus_EXITING {
continue
}
pk := bytesutil.ToBytes48(duty.PublicKey)
slotSig, err := v.signSlotWithSelectionProof(ctx, pk, duty.AttesterSlot)
if err != nil {
return err
}
req = append(req, iface.BeaconCommitteeSelection{
SelectionProof: slotSig,
Slot: duty.AttesterSlot,
ValidatorIndex: duty.ValidatorIndex,
})
}
for _, duty := range duties.NextEpochDuties {
if duty.Status != ethpb.ValidatorStatus_ACTIVE && duty.Status != ethpb.ValidatorStatus_EXITING {
continue
}
pk := bytesutil.ToBytes48(duty.PublicKey)
slotSig, err := v.signSlotWithSelectionProof(ctx, pk, duty.AttesterSlot)
if err != nil {
return err
}
req = append(req, iface.BeaconCommitteeSelection{
SelectionProof: slotSig,
Slot: duty.AttesterSlot,
ValidatorIndex: duty.ValidatorIndex,
})
}
resp, err := v.validatorClient.GetAggregatedSelections(ctx, req)
if err != nil {
return err
}
// Store aggregated selection proofs in state.
v.addAttSelections(resp)
return nil
}
func (v *validator) addAttSelections(selections []iface.BeaconCommitteeSelection) {
v.attSelectionLock.Lock()
defer v.attSelectionLock.Unlock()
for _, s := range selections {
v.attSelections[attSelectionKey{
slot: s.Slot,
index: s.ValidatorIndex,
}] = s
}
}
func (v *validator) newAttSelections() {
v.attSelectionLock.Lock()
defer v.attSelectionLock.Unlock()
v.attSelections = make(map[attSelectionKey]iface.BeaconCommitteeSelection)
}
func (v *validator) getAttSelection(key attSelectionKey) ([]byte, error) {
v.attSelectionLock.Lock()
defer v.attSelectionLock.Unlock()
s, ok := v.attSelections[key]
if !ok {
return nil, errors.Errorf("selection proof not found for the given slot=%d and validator_index=%d", key.slot, key.index)
}
return s.SelectionProof, nil
}
// This constructs a validator subscribed key, it's used to track
// which subnet has already been pending requested.
func validatorSubscribeKey(slot primitives.Slot, committeeID primitives.CommitteeIndex) [64]byte {

View File

@@ -639,6 +639,92 @@ func TestUpdateDuties_AllValidatorsExited(t *testing.T) {
}
func TestUpdateDuties_Distributed(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
client := validatormock.NewMockValidatorClient(ctrl)
// Start of third epoch.
slot := 2 * params.BeaconConfig().SlotsPerEpoch
keys := randKeypair(t)
resp := &ethpb.DutiesResponse{
CurrentEpochDuties: []*ethpb.DutiesResponse_Duty{
{
AttesterSlot: slot, // First slot in epoch.
ValidatorIndex: 200,
CommitteeIndex: 100,
PublicKey: keys.pub[:],
Status: ethpb.ValidatorStatus_ACTIVE,
},
},
NextEpochDuties: []*ethpb.DutiesResponse_Duty{
{
AttesterSlot: slot + params.BeaconConfig().SlotsPerEpoch, // First slot in next epoch.
ValidatorIndex: 200,
CommitteeIndex: 100,
PublicKey: keys.pub[:],
Status: ethpb.ValidatorStatus_ACTIVE,
},
},
}
v := validator{
keyManager: newMockKeymanager(t, keys),
validatorClient: client,
distributed: true,
}
sigDomain := make([]byte, 32)
client.EXPECT().GetDuties(
gomock.Any(),
gomock.Any(),
).Return(resp, nil)
client.EXPECT().DomainData(
gomock.Any(), // ctx
gomock.Any(), // epoch
).Return(
&ethpb.DomainResponse{SignatureDomain: sigDomain},
nil, /*err*/
).Times(2)
client.EXPECT().GetAggregatedSelections(
gomock.Any(),
gomock.Any(), // fill this properly
).Return(
[]iface.BeaconCommitteeSelection{
{
SelectionProof: make([]byte, 32),
Slot: slot,
ValidatorIndex: 200,
},
{
SelectionProof: make([]byte, 32),
Slot: slot + params.BeaconConfig().SlotsPerEpoch,
ValidatorIndex: 200,
},
},
nil,
)
var wg sync.WaitGroup
wg.Add(1)
client.EXPECT().SubscribeCommitteeSubnets(
gomock.Any(),
gomock.Any(),
gomock.Any(),
).DoAndReturn(func(_ context.Context, _ *ethpb.CommitteeSubnetsSubscribeRequest, _ []primitives.ValidatorIndex) (*emptypb.Empty, error) {
wg.Done()
return nil, nil
})
require.NoError(t, v.UpdateDuties(context.Background(), slot), "Could not update assignments")
util.WaitTimeout(&wg, 2*time.Second)
require.Equal(t, 2, len(v.attSelections))
}
func TestRolesAt_OK(t *testing.T) {
v, m, validatorKey, finish := setup(t)
defer finish()
@@ -2173,7 +2259,7 @@ func TestValidator_buildSignedRegReqs_DefaultConfigDisabled(t *testing.T) {
ctx := context.Background()
client := validatormock.NewMockValidatorClient(ctrl)
signature := blsmock.NewSignature(ctrl)
signature := blsmock.NewMockSignature(ctrl)
signature.EXPECT().Marshal().Return([]byte{})
v := validator{
@@ -2258,7 +2344,7 @@ func TestValidator_buildSignedRegReqs_DefaultConfigEnabled(t *testing.T) {
ctx := context.Background()
client := validatormock.NewMockValidatorClient(ctrl)
signature := blsmock.NewSignature(ctrl)
signature := blsmock.NewMockSignature(ctrl)
signature.EXPECT().Marshal().Return([]byte{}).Times(2)
v := validator{
@@ -2380,7 +2466,7 @@ func TestValidator_buildSignedRegReqs_TimestampBeforeGenesis(t *testing.T) {
ctx := context.Background()
client := validatormock.NewMockValidatorClient(ctrl)
signature := blsmock.NewSignature(ctrl)
signature := blsmock.NewMockSignature(ctrl)
v := validator{
signedValidatorRegistrations: map[[48]byte]*ethpb.SignedValidatorRegistrationV1{},

View File

@@ -497,6 +497,7 @@ func (c *ValidatorClient) registerValidatorService(cliCtx *cli.Context) error {
BeaconApiTimeout: time.Second * 30,
BeaconApiEndpoint: c.cliCtx.String(flags.BeaconRESTApiProviderFlag.Name),
ValidatorsRegBatchSize: c.cliCtx.Int(flags.ValidatorsRegistrationBatchSizeFlag.Name),
Distributed: c.cliCtx.Bool(flags.EnableDistributed.Name),
})
if err != nil {
return errors.Wrap(err, "could not initialize validator service")