diff --git a/changelog/james-prysm_poc-committee-removal.md b/changelog/james-prysm_poc-committee-removal.md new file mode 100644 index 0000000000..8461e23312 --- /dev/null +++ b/changelog/james-prysm_poc-committee-removal.md @@ -0,0 +1,5 @@ +### Changed +- The validator client will no longer use the full list of committee values but instead use the committee length and validator committee index. + +### Removed +- Remove /eth/v1/beacon/states/head/committees call when getting duties. \ No newline at end of file diff --git a/proto/prysm/v1alpha1/BUILD.bazel b/proto/prysm/v1alpha1/BUILD.bazel index aaf162fbad..a6bcdd8987 100644 --- a/proto/prysm/v1alpha1/BUILD.bazel +++ b/proto/prysm/v1alpha1/BUILD.bazel @@ -342,6 +342,7 @@ go_library( "eip_7521.go", "sync_committee_mainnet.go", "sync_committee_minimal.go", # keep + "validator.go", ":ssz_generated_altair", # keep ":ssz_generated_bellatrix", # keep ":ssz_generated_capella", # keep diff --git a/proto/prysm/v1alpha1/validator.go b/proto/prysm/v1alpha1/validator.go new file mode 100644 index 0000000000..b22f616095 --- /dev/null +++ b/proto/prysm/v1alpha1/validator.go @@ -0,0 +1,25 @@ +package eth + +import ( + "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives" +) + +// ValidatorDutiesContainer is a wrapper that can be both used for the gRPC DutiesResponse and Rest API response structs for attestation, proposer, and sync duties. +type ValidatorDutiesContainer struct { + CurrentEpochDuties []*ValidatorDuty + NextEpochDuties []*ValidatorDuty +} + +// ValidatorDuty is all the information needed to execute validator duties +type ValidatorDuty struct { + CommitteeLength uint64 + CommitteeIndex primitives.CommitteeIndex + CommitteesAtSlot uint64 + ValidatorCommitteeIndex uint64 + AttesterSlot primitives.Slot + ProposerSlots []primitives.Slot + PublicKey []byte + Status ValidatorStatus + ValidatorIndex primitives.ValidatorIndex + IsSyncCommittee bool +} diff --git a/testing/validator-mock/validator_client_mock.go b/testing/validator-mock/validator_client_mock.go index 274ebb39d6..f576aa7bb0 100644 --- a/testing/validator-mock/validator_client_mock.go +++ b/testing/validator-mock/validator_client_mock.go @@ -135,10 +135,10 @@ func (mr *MockValidatorClientMockRecorder) DomainData(arg0, arg1 any) *gomock.Ca } // Duties mocks base method. -func (m *MockValidatorClient) Duties(arg0 context.Context, arg1 *eth.DutiesRequest) (*eth.DutiesResponse, error) { +func (m *MockValidatorClient) Duties(arg0 context.Context, arg1 *eth.DutiesRequest) (*eth.ValidatorDutiesContainer, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Duties", arg0, arg1) - ret0, _ := ret[0].(*eth.DutiesResponse) + ret0, _ := ret[0].(*eth.ValidatorDutiesContainer) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -412,7 +412,7 @@ func (mr *MockValidatorClientMockRecorder) SubmitValidatorRegistrations(arg0, ar } // SubscribeCommitteeSubnets mocks base method. -func (m *MockValidatorClient) SubscribeCommitteeSubnets(arg0 context.Context, arg1 *eth.CommitteeSubnetsSubscribeRequest, arg2 []*eth.DutiesResponse_Duty) (*emptypb.Empty, error) { +func (m *MockValidatorClient) SubscribeCommitteeSubnets(arg0 context.Context, arg1 *eth.CommitteeSubnetsSubscribeRequest, arg2 []*eth.ValidatorDuty) (*emptypb.Empty, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "SubscribeCommitteeSubnets", arg0, arg1, arg2) ret0, _ := ret[0].(*emptypb.Empty) diff --git a/validator/client/aggregate.go b/validator/client/aggregate.go index de7658fd7b..ee1022cb9e 100644 --- a/validator/client/aggregate.go +++ b/validator/client/aggregate.go @@ -91,14 +91,14 @@ func (v *validator) SubmitAggregateAndProof(ctx context.Context, slot primitives // TODO: look at renaming SubmitAggregateSelectionProof functions as they are GET beacon API var agg ethpb.AggregateAttAndProof if postElectra { - res, err := v.validatorClient.SubmitAggregateSelectionProofElectra(ctx, aggSelectionRequest, duty.ValidatorIndex, uint64(len(duty.Committee))) + res, err := v.validatorClient.SubmitAggregateSelectionProofElectra(ctx, aggSelectionRequest, duty.ValidatorIndex, duty.CommitteeLength) if err != nil { v.handleSubmitAggSelectionProofError(err, slot, fmtKey) return } agg = res.AggregateAndProof } else { - res, err := v.validatorClient.SubmitAggregateSelectionProof(ctx, aggSelectionRequest, duty.ValidatorIndex, uint64(len(duty.Committee))) + res, err := v.validatorClient.SubmitAggregateSelectionProof(ctx, aggSelectionRequest, duty.ValidatorIndex, duty.CommitteeLength) if err != nil { v.handleSubmitAggSelectionProofError(err, slot, fmtKey) return diff --git a/validator/client/aggregate_test.go b/validator/client/aggregate_test.go index be61693530..09a016dd92 100644 --- a/validator/client/aggregate_test.go +++ b/validator/client/aggregate_test.go @@ -27,7 +27,7 @@ func TestSubmitAggregateAndProof_GetDutiesRequestFailure(t *testing.T) { t.Run(fmt.Sprintf("SlashingProtectionMinimal:%v", isSlashingProtectionMinimal), func(t *testing.T) { hook := logTest.NewGlobal() validator, _, validatorKey, finish := setup(t, isSlashingProtectionMinimal) - validator.duties = ðpb.DutiesResponse{CurrentEpochDuties: []*ethpb.DutiesResponse_Duty{}} + validator.duties = ðpb.ValidatorDutiesContainer{CurrentEpochDuties: []*ethpb.ValidatorDuty{}} defer finish() var pubKey [fieldparams.BLSPubkeyLength]byte @@ -46,8 +46,8 @@ func TestSubmitAggregateAndProof_SignFails(t *testing.T) { defer finish() var pubKey [fieldparams.BLSPubkeyLength]byte copy(pubKey[:], validatorKey.PublicKey().Marshal()) - validator.duties = ðpb.DutiesResponse{ - CurrentEpochDuties: []*ethpb.DutiesResponse_Duty{ + validator.duties = ðpb.ValidatorDutiesContainer{ + CurrentEpochDuties: []*ethpb.ValidatorDuty{ { PublicKey: validatorKey.PublicKey().Marshal(), }, @@ -91,8 +91,8 @@ func TestSubmitAggregateAndProof_Ok(t *testing.T) { defer finish() var pubKey [fieldparams.BLSPubkeyLength]byte copy(pubKey[:], validatorKey.PublicKey().Marshal()) - validator.duties = ðpb.DutiesResponse{ - CurrentEpochDuties: []*ethpb.DutiesResponse_Duty{ + validator.duties = ðpb.ValidatorDutiesContainer{ + CurrentEpochDuties: []*ethpb.ValidatorDuty{ { PublicKey: validatorKey.PublicKey().Marshal(), }, @@ -144,8 +144,8 @@ func TestSubmitAggregateAndProof_Ok(t *testing.T) { defer finish() var pubKey [fieldparams.BLSPubkeyLength]byte copy(pubKey[:], validatorKey.PublicKey().Marshal()) - validator.duties = ðpb.DutiesResponse{ - CurrentEpochDuties: []*ethpb.DutiesResponse_Duty{ + validator.duties = ðpb.ValidatorDutiesContainer{ + CurrentEpochDuties: []*ethpb.ValidatorDuty{ { PublicKey: validatorKey.PublicKey().Marshal(), }, @@ -198,8 +198,8 @@ func TestSubmitAggregateAndProof_Distributed(t *testing.T) { var pubKey [fieldparams.BLSPubkeyLength]byte copy(pubKey[:], validatorKey.PublicKey().Marshal()) - validator.duties = ðpb.DutiesResponse{ - CurrentEpochDuties: []*ethpb.DutiesResponse_Duty{ + validator.duties = ðpb.ValidatorDutiesContainer{ + CurrentEpochDuties: []*ethpb.ValidatorDuty{ { PublicKey: validatorKey.PublicKey().Marshal(), ValidatorIndex: validatorIdx, diff --git a/validator/client/attest.go b/validator/client/attest.go index 8c201f3279..cae6ed6fdc 100644 --- a/validator/client/attest.go +++ b/validator/client/attest.go @@ -66,7 +66,7 @@ func (v *validator) SubmitAttestation(ctx context.Context, slot primitives.Slot, tracing.AnnotateError(span, err) return } - if len(duty.Committee) == 0 { + if duty.CommitteeLength == 0 { log.Debug("Empty committee for validator duty, not attesting") return } @@ -145,24 +145,8 @@ func (v *validator) SubmitAttestation(ctx context.Context, slot primitives.Slot, attestation = sa attResp, err = v.validatorClient.ProposeAttestationElectra(ctx, sa) } else { - var indexInCommittee uint64 - var found bool - for i, vID := range duty.Committee { - if vID == duty.ValidatorIndex { - indexInCommittee = uint64(i) - found = true - break - } - } - if !found { - log.Errorf("Validator ID %d not found in committee of %v", duty.ValidatorIndex, duty.Committee) - if v.emitAccountMetrics { - ValidatorAttestFailVec.WithLabelValues(fmtKey).Inc() - } - return - } - aggregationBitfield = bitfield.NewBitlist(uint64(len(duty.Committee))) - aggregationBitfield.SetBitAt(indexInCommittee, true) + aggregationBitfield = bitfield.NewBitlist(duty.CommitteeLength) + aggregationBitfield.SetBitAt(duty.ValidatorCommitteeIndex, true) a := ðpb.Attestation{ Data: data, AggregationBits: aggregationBitfield, @@ -211,7 +195,7 @@ func (v *validator) SubmitAttestation(ctx context.Context, slot primitives.Slot, } // Given the validator public key, this gets the validator assignment. -func (v *validator) duty(pubKey [fieldparams.BLSPubkeyLength]byte) (*ethpb.DutiesResponse_Duty, error) { +func (v *validator) duty(pubKey [fieldparams.BLSPubkeyLength]byte) (*ethpb.ValidatorDuty, error) { v.dutiesLock.RLock() defer v.dutiesLock.RUnlock() if v.duties == nil { diff --git a/validator/client/attest_test.go b/validator/client/attest_test.go index 4ed02f666e..061234f0d1 100644 --- a/validator/client/attest_test.go +++ b/validator/client/attest_test.go @@ -34,7 +34,7 @@ func TestRequestAttestation_ValidatorDutiesRequestFailure(t *testing.T) { t.Run(fmt.Sprintf("SlashingProtectionMinimal:%v", isSlashingProtectionMinimal), func(t *testing.T) { hook := logTest.NewGlobal() validator, _, validatorKey, finish := setup(t, isSlashingProtectionMinimal) - validator.duties = ðpb.DutiesResponse{CurrentEpochDuties: []*ethpb.DutiesResponse_Duty{}} + validator.duties = ðpb.ValidatorDutiesContainer{CurrentEpochDuties: []*ethpb.ValidatorDuty{}} defer finish() var pubKey [fieldparams.BLSPubkeyLength]byte @@ -54,11 +54,10 @@ func TestAttestToBlockHead_SubmitAttestation_EmptyCommittee(t *testing.T) { defer finish() var pubKey [fieldparams.BLSPubkeyLength]byte copy(pubKey[:], validatorKey.PublicKey().Marshal()) - validator.duties = ðpb.DutiesResponse{CurrentEpochDuties: []*ethpb.DutiesResponse_Duty{ + validator.duties = ðpb.ValidatorDutiesContainer{CurrentEpochDuties: []*ethpb.ValidatorDuty{ { PublicKey: validatorKey.PublicKey().Marshal(), CommitteeIndex: 0, - Committee: make([]primitives.ValidatorIndex, 0), ValidatorIndex: 0, }}} validator.SubmitAttestation(context.Background(), 0, pubKey) @@ -74,12 +73,12 @@ func TestAttestToBlockHead_SubmitAttestation_RequestFailure(t *testing.T) { validator, m, validatorKey, finish := setup(t, isSlashingProtectionMinimal) defer finish() - validator.duties = ðpb.DutiesResponse{CurrentEpochDuties: []*ethpb.DutiesResponse_Duty{ + validator.duties = ðpb.ValidatorDutiesContainer{CurrentEpochDuties: []*ethpb.ValidatorDuty{ { - PublicKey: validatorKey.PublicKey().Marshal(), - CommitteeIndex: 5, - Committee: make([]primitives.ValidatorIndex, 111), - ValidatorIndex: 0, + PublicKey: validatorKey.PublicKey().Marshal(), + CommitteeIndex: 5, + CommitteeLength: 111, + ValidatorIndex: 0, }}} m.validatorClient.EXPECT().AttestationData( gomock.Any(), // ctx @@ -116,12 +115,13 @@ func TestAttestToBlockHead_AttestsCorrectly(t *testing.T) { committee := []primitives.ValidatorIndex{0, 3, 4, 2, validatorIndex, 6, 8, 9, 10} var pubKey [fieldparams.BLSPubkeyLength]byte copy(pubKey[:], validatorKey.PublicKey().Marshal()) - validator.duties = ðpb.DutiesResponse{CurrentEpochDuties: []*ethpb.DutiesResponse_Duty{ + validator.duties = ðpb.ValidatorDutiesContainer{CurrentEpochDuties: []*ethpb.ValidatorDuty{ { - PublicKey: validatorKey.PublicKey().Marshal(), - CommitteeIndex: 5, - Committee: committee, - ValidatorIndex: validatorIndex, + PublicKey: validatorKey.PublicKey().Marshal(), + CommitteeIndex: 5, + CommitteeLength: uint64(len(committee)), + ValidatorCommitteeIndex: 4, + ValidatorIndex: validatorIndex, }, }} @@ -196,12 +196,12 @@ func TestAttestToBlockHead_AttestsCorrectly(t *testing.T) { committee := []primitives.ValidatorIndex{0, 3, 4, 2, validatorIndex, 6, 8, 9, 10} var pubKey [fieldparams.BLSPubkeyLength]byte copy(pubKey[:], validatorKey.PublicKey().Marshal()) - validator.duties = ðpb.DutiesResponse{CurrentEpochDuties: []*ethpb.DutiesResponse_Duty{ + validator.duties = ðpb.ValidatorDutiesContainer{CurrentEpochDuties: []*ethpb.ValidatorDuty{ { - PublicKey: validatorKey.PublicKey().Marshal(), - CommitteeIndex: 5, - Committee: committee, - ValidatorIndex: validatorIndex, + PublicKey: validatorKey.PublicKey().Marshal(), + CommitteeIndex: 5, + CommitteeLength: uint64(len(committee)), + ValidatorIndex: validatorIndex, }, }} @@ -276,12 +276,12 @@ func TestAttestToBlockHead_BlocksDoubleAtt(t *testing.T) { committee := []primitives.ValidatorIndex{0, 3, 4, 2, validatorIndex, 6, 8, 9, 10} var pubKey [fieldparams.BLSPubkeyLength]byte copy(pubKey[:], validatorKey.PublicKey().Marshal()) - validator.duties = ðpb.DutiesResponse{CurrentEpochDuties: []*ethpb.DutiesResponse_Duty{ + validator.duties = ðpb.ValidatorDutiesContainer{CurrentEpochDuties: []*ethpb.ValidatorDuty{ { - PublicKey: validatorKey.PublicKey().Marshal(), - CommitteeIndex: 5, - Committee: committee, - ValidatorIndex: validatorIndex, + PublicKey: validatorKey.PublicKey().Marshal(), + CommitteeIndex: 5, + CommitteeLength: uint64(len(committee)), + ValidatorIndex: validatorIndex, }, }} beaconBlockRoot := bytesutil.ToBytes32([]byte("A")) @@ -332,12 +332,12 @@ func TestAttestToBlockHead_BlocksSurroundAtt(t *testing.T) { committee := []primitives.ValidatorIndex{0, 3, 4, 2, validatorIndex, 6, 8, 9, 10} var pubKey [fieldparams.BLSPubkeyLength]byte copy(pubKey[:], validatorKey.PublicKey().Marshal()) - validator.duties = ðpb.DutiesResponse{CurrentEpochDuties: []*ethpb.DutiesResponse_Duty{ + validator.duties = ðpb.ValidatorDutiesContainer{CurrentEpochDuties: []*ethpb.ValidatorDuty{ { - PublicKey: validatorKey.PublicKey().Marshal(), - CommitteeIndex: 5, - Committee: committee, - ValidatorIndex: validatorIndex, + PublicKey: validatorKey.PublicKey().Marshal(), + CommitteeIndex: 5, + CommitteeLength: uint64(len(committee)), + ValidatorIndex: validatorIndex, }, }} beaconBlockRoot := bytesutil.ToBytes32([]byte("A")) @@ -388,12 +388,12 @@ func TestAttestToBlockHead_BlocksSurroundedAtt(t *testing.T) { var pubKey [fieldparams.BLSPubkeyLength]byte copy(pubKey[:], validatorKey.PublicKey().Marshal()) committee := []primitives.ValidatorIndex{0, 3, 4, 2, validatorIndex, 6, 8, 9, 10} - validator.duties = ðpb.DutiesResponse{CurrentEpochDuties: []*ethpb.DutiesResponse_Duty{ + validator.duties = ðpb.ValidatorDutiesContainer{CurrentEpochDuties: []*ethpb.ValidatorDuty{ { - PublicKey: validatorKey.PublicKey().Marshal(), - CommitteeIndex: 5, - Committee: committee, - ValidatorIndex: validatorIndex, + PublicKey: validatorKey.PublicKey().Marshal(), + CommitteeIndex: 5, + CommitteeLength: uint64(len(committee)), + ValidatorIndex: validatorIndex, }, }} beaconBlockRoot := bytesutil.ToBytes32([]byte("A")) @@ -483,12 +483,12 @@ func TestAttestToBlockHead_DoesAttestAfterDelay(t *testing.T) { committee := []primitives.ValidatorIndex{0, 3, 4, 2, validatorIndex, 6, 8, 9, 10} var pubKey [fieldparams.BLSPubkeyLength]byte copy(pubKey[:], validatorKey.PublicKey().Marshal()) - validator.duties = ðpb.DutiesResponse{CurrentEpochDuties: []*ethpb.DutiesResponse_Duty{ + validator.duties = ðpb.ValidatorDutiesContainer{CurrentEpochDuties: []*ethpb.ValidatorDuty{ { - PublicKey: validatorKey.PublicKey().Marshal(), - CommitteeIndex: 5, - Committee: committee, - ValidatorIndex: validatorIndex, + PublicKey: validatorKey.PublicKey().Marshal(), + CommitteeIndex: 5, + CommitteeLength: uint64(len(committee)), + ValidatorIndex: validatorIndex, }}} m.validatorClient.EXPECT().AttestationData( @@ -526,12 +526,12 @@ func TestAttestToBlockHead_CorrectBitfieldLength(t *testing.T) { committee := []primitives.ValidatorIndex{0, 3, 4, 2, validatorIndex, 6, 8, 9, 10} var pubKey [fieldparams.BLSPubkeyLength]byte copy(pubKey[:], validatorKey.PublicKey().Marshal()) - validator.duties = ðpb.DutiesResponse{CurrentEpochDuties: []*ethpb.DutiesResponse_Duty{ + validator.duties = ðpb.ValidatorDutiesContainer{CurrentEpochDuties: []*ethpb.ValidatorDuty{ { - PublicKey: validatorKey.PublicKey().Marshal(), - CommitteeIndex: 5, - Committee: committee, - ValidatorIndex: validatorIndex, + PublicKey: validatorKey.PublicKey().Marshal(), + CommitteeIndex: 5, + CommitteeLength: uint64(len(committee)), + ValidatorIndex: validatorIndex, }}} m.validatorClient.EXPECT().AttestationData( gomock.Any(), // ctx diff --git a/validator/client/beacon-api/beacon_api_validator_client.go b/validator/client/beacon-api/beacon_api_validator_client.go index e0f923bbc7..5ddff0730f 100644 --- a/validator/client/beacon-api/beacon_api_validator_client.go +++ b/validator/client/beacon-api/beacon_api_validator_client.go @@ -47,10 +47,10 @@ func NewBeaconApiValidatorClient(jsonRestHandler JsonRestHandler, opts ...Valida return c } -func (c *beaconApiValidatorClient) Duties(ctx context.Context, in *ethpb.DutiesRequest) (*ethpb.DutiesResponse, error) { +func (c *beaconApiValidatorClient) Duties(ctx context.Context, in *ethpb.DutiesRequest) (*ethpb.ValidatorDutiesContainer, error) { ctx, span := trace.StartSpan(ctx, "beacon-api.Duties") defer span.End() - return wrapInMetrics[*ethpb.DutiesResponse]("Duties", func() (*ethpb.DutiesResponse, error) { + return wrapInMetrics[*ethpb.ValidatorDutiesContainer]("Duties", func() (*ethpb.ValidatorDutiesContainer, error) { return c.duties(ctx, in) }) } @@ -248,7 +248,7 @@ func (c *beaconApiValidatorClient) SubmitValidatorRegistrations(ctx context.Cont }) } -func (c *beaconApiValidatorClient) SubscribeCommitteeSubnets(ctx context.Context, in *ethpb.CommitteeSubnetsSubscribeRequest, duties []*ethpb.DutiesResponse_Duty) (*empty.Empty, error) { +func (c *beaconApiValidatorClient) SubscribeCommitteeSubnets(ctx context.Context, in *ethpb.CommitteeSubnetsSubscribeRequest, duties []*ethpb.ValidatorDuty) (*empty.Empty, error) { ctx, span := trace.StartSpan(ctx, "beacon-api.SubscribeCommitteeSubnets") defer span.End() diff --git a/validator/client/beacon-api/duties.go b/validator/client/beacon-api/duties.go index 2cc26fdc20..b83739e75e 100644 --- a/validator/client/beacon-api/duties.go +++ b/validator/client/beacon-api/duties.go @@ -29,9 +29,12 @@ type beaconApiDutiesProvider struct { jsonRestHandler JsonRestHandler } -type committeeIndexSlotPair struct { - committeeIndex primitives.CommitteeIndex - slot primitives.Slot +type attesterDuty struct { + committeeIndex primitives.CommitteeIndex + slot primitives.Slot + committeeLength uint64 + validatorCommitteeIndex uint64 + committeesAtSlot uint64 } type validatorForDuty struct { @@ -40,7 +43,7 @@ type validatorForDuty struct { status ethpb.ValidatorStatus } -func (c *beaconApiValidatorClient) duties(ctx context.Context, in *ethpb.DutiesRequest) (*ethpb.DutiesResponse, error) { +func (c *beaconApiValidatorClient) duties(ctx context.Context, in *ethpb.DutiesRequest) (*ethpb.ValidatorDutiesContainer, error) { vals, err := c.validatorsForDuties(ctx, in.PublicKeys) if err != nil { return nil, errors.Wrap(err, "failed to get validators for duties") @@ -51,7 +54,7 @@ func (c *beaconApiValidatorClient) duties(ctx context.Context, in *ethpb.DutiesR errCh := make(chan error, 1) - var currentEpochDuties []*ethpb.DutiesResponse_Duty + var currentEpochDuties []*ethpb.ValidatorDuty go func() { currentEpochDuties, err = c.dutiesForEpoch(ctx, in.Epoch, vals, fetchSyncDuties) if err != nil { @@ -70,7 +73,7 @@ func (c *beaconApiValidatorClient) duties(ctx context.Context, in *ethpb.DutiesR return nil, err } - return ðpb.DutiesResponse{ + return ðpb.ValidatorDutiesContainer{ CurrentEpochDuties: currentEpochDuties, NextEpochDuties: nextEpochDuties, }, nil @@ -81,7 +84,7 @@ func (c *beaconApiValidatorClient) dutiesForEpoch( epoch primitives.Epoch, vals []validatorForDuty, fetchSyncDuties bool, -) ([]*ethpb.DutiesResponse_Duty, error) { +) ([]*ethpb.ValidatorDuty, error) { indices := make([]primitives.ValidatorIndex, len(vals)) for i, v := range vals { indices[i] = v.index @@ -92,13 +95,11 @@ func (c *beaconApiValidatorClient) dutiesForEpoch( // will return only once all goroutines finish their execution. // Mapping from a validator index to its attesting committee's index and slot - attesterDutiesMapping := make(map[primitives.ValidatorIndex]committeeIndexSlotPair) + attesterDutiesMapping := make(map[primitives.ValidatorIndex]attesterDuty) // Set containing all validator indices that are part of a sync committee for this epoch syncDutiesMapping := make(map[primitives.ValidatorIndex]bool) // Mapping from a validator index to its proposal slot proposerDutySlots := make(map[primitives.ValidatorIndex][]primitives.Slot) - // Mapping from the {committeeIndex, slot} to each of the committee's validator indices - committeeMapping := make(map[committeeIndexSlotPair][]primitives.ValidatorIndex) var wg errgroup.Group @@ -108,22 +109,37 @@ func (c *beaconApiValidatorClient) dutiesForEpoch( return errors.Wrapf(err, "failed to get attester duties for epoch `%d`", epoch) } - for _, attesterDuty := range attesterDuties { - validatorIndex, err := strconv.ParseUint(attesterDuty.ValidatorIndex, 10, 64) + for _, duty := range attesterDuties { + validatorIndex, err := strconv.ParseUint(duty.ValidatorIndex, 10, 64) if err != nil { - return errors.Wrapf(err, "failed to parse attester validator index `%s`", attesterDuty.ValidatorIndex) + return errors.Wrapf(err, "failed to parse attester validator index `%s`", duty.ValidatorIndex) } - slot, err := strconv.ParseUint(attesterDuty.Slot, 10, 64) + slot, err := strconv.ParseUint(duty.Slot, 10, 64) if err != nil { - return errors.Wrapf(err, "failed to parse attester slot `%s`", attesterDuty.Slot) + return errors.Wrapf(err, "failed to parse attester slot `%s`", duty.Slot) } - committeeIndex, err := strconv.ParseUint(attesterDuty.CommitteeIndex, 10, 64) + committeeIndex, err := strconv.ParseUint(duty.CommitteeIndex, 10, 64) if err != nil { - return errors.Wrapf(err, "failed to parse attester committee index `%s`", attesterDuty.CommitteeIndex) + return errors.Wrapf(err, "failed to parse attester committee index `%s`", duty.CommitteeIndex) } - attesterDutiesMapping[primitives.ValidatorIndex(validatorIndex)] = committeeIndexSlotPair{ - slot: primitives.Slot(slot), - committeeIndex: primitives.CommitteeIndex(committeeIndex), + committeeLength, err := strconv.ParseUint(duty.CommitteeLength, 10, 64) + if err != nil { + return errors.Wrapf(err, "failed to parse attester committee length `%s`", duty.CommitteeLength) + } + validatorCommitteeIndex, err := strconv.ParseUint(duty.ValidatorCommitteeIndex, 10, 64) + if err != nil { + return errors.Wrapf(err, "failed to parse attester validator committee index `%s`", duty.ValidatorCommitteeIndex) + } + committeesAtSlot, err := strconv.ParseUint(duty.CommitteesAtSlot, 10, 64) + if err != nil { + return errors.Wrapf(err, "failed to parse attester committees at slot `%s`", duty.CommitteesAtSlot) + } + attesterDutiesMapping[primitives.ValidatorIndex(validatorIndex)] = attesterDuty{ + slot: primitives.Slot(slot), + committeeIndex: primitives.CommitteeIndex(committeeIndex), + committeeLength: committeeLength, + validatorCommitteeIndex: validatorCommitteeIndex, + committeesAtSlot: committeesAtSlot, } } return nil @@ -135,7 +151,6 @@ func (c *beaconApiValidatorClient) dutiesForEpoch( if err != nil { return errors.Wrapf(err, "failed to get sync duties for epoch `%d`", epoch) } - for _, syncDuty := range syncDuties { validatorIndex, err := strconv.ParseUint(syncDuty.ValidatorIndex, 10, 64) if err != nil { @@ -168,74 +183,28 @@ func (c *beaconApiValidatorClient) dutiesForEpoch( return nil }) - committees, err := c.dutiesProvider.Committees(ctx, epoch) - if err != nil { - return nil, errors.Wrapf(err, "failed to get committees for epoch `%d`", epoch) - } - slotCommittees := make(map[string]uint64) - for _, c := range committees { - n, ok := slotCommittees[c.Slot] - if !ok { - n = 0 - } - slotCommittees[c.Slot] = n + 1 - } - - for _, committee := range committees { - committeeIndex, err := strconv.ParseUint(committee.Index, 10, 64) - if err != nil { - return nil, errors.Wrapf(err, "failed to parse committee index `%s`", committee.Index) - } - slot, err := strconv.ParseUint(committee.Slot, 10, 64) - if err != nil { - return nil, errors.Wrapf(err, "failed to parse slot `%s`", committee.Slot) - } - validatorIndices := make([]primitives.ValidatorIndex, len(committee.Validators)) - for index, validatorIndexString := range committee.Validators { - validatorIndex, err := strconv.ParseUint(validatorIndexString, 10, 64) - if err != nil { - return nil, errors.Wrapf(err, "failed to parse committee validator index `%s`", validatorIndexString) - } - validatorIndices[index] = primitives.ValidatorIndex(validatorIndex) - } - key := committeeIndexSlotPair{ - committeeIndex: primitives.CommitteeIndex(committeeIndex), - slot: primitives.Slot(slot), - } - committeeMapping[key] = validatorIndices - } - - if err = wg.Wait(); err != nil { + if err := wg.Wait(); err != nil { return nil, err } - duties := make([]*ethpb.DutiesResponse_Duty, len(vals)) + duties := make([]*ethpb.ValidatorDuty, len(vals)) for i, v := range vals { - var ( - attesterSlot primitives.Slot - committeeIndex primitives.CommitteeIndex - committeeValidatorIndices []primitives.ValidatorIndex - ) - - if committeeMappingKey, ok := attesterDutiesMapping[v.index]; ok { - committeeIndex = committeeMappingKey.committeeIndex - attesterSlot = committeeMappingKey.slot - - if committeeValidatorIndices, ok = committeeMapping[committeeMappingKey]; !ok { - return nil, errors.Errorf("failed to find validators for committee index `%d` and slot `%d`", committeeIndex, attesterSlot) - } + att, ok := attesterDutiesMapping[v.index] + if !ok { + log.Debugf("failed to find attester duty for validator `%d`", v.index) } - duties[i] = ðpb.DutiesResponse_Duty{ - Committee: committeeValidatorIndices, - CommitteeIndex: committeeIndex, - AttesterSlot: attesterSlot, - ProposerSlots: proposerDutySlots[v.index], - PublicKey: v.pubkey, - Status: v.status, - ValidatorIndex: v.index, - IsSyncCommittee: syncDutiesMapping[v.index], - CommitteesAtSlot: slotCommittees[strconv.FormatUint(uint64(attesterSlot), 10)], + duties[i] = ðpb.ValidatorDuty{ + ValidatorCommitteeIndex: att.validatorCommitteeIndex, + CommitteeLength: att.committeeLength, + CommitteeIndex: att.committeeIndex, + AttesterSlot: att.slot, + CommitteesAtSlot: att.committeesAtSlot, + ProposerSlots: proposerDutySlots[v.index], + PublicKey: v.pubkey, + Status: v.status, + ValidatorIndex: v.index, + IsSyncCommittee: syncDutiesMapping[v.index], } } @@ -287,7 +256,7 @@ func (c *beaconApiValidatorClient) validatorsForDuties(ctx context.Context, pubk return vals, nil } -// GetCommittees retrieves the committees for the given epoch +// Committees retrieves the committees for the given epoch func (c beaconApiDutiesProvider) Committees(ctx context.Context, epoch primitives.Epoch) ([]*structs.Committee, error) { committeeParams := url.Values{} committeeParams.Add("epoch", strconv.FormatUint(uint64(epoch), 10)) @@ -311,7 +280,7 @@ func (c beaconApiDutiesProvider) Committees(ctx context.Context, epoch primitive return stateCommittees.Data, nil } -// GetAttesterDuties retrieves the attester duties for the given epoch and validatorIndices +// AttesterDuties retrieves the attester duties for the given epoch and validatorIndices func (c beaconApiDutiesProvider) AttesterDuties(ctx context.Context, epoch primitives.Epoch, validatorIndices []primitives.ValidatorIndex) ([]*structs.AttesterDuty, error) { jsonValidatorIndices := make([]string, len(validatorIndices)) for index, validatorIndex := range validatorIndices { @@ -343,7 +312,7 @@ func (c beaconApiDutiesProvider) AttesterDuties(ctx context.Context, epoch primi return attesterDuties.Data, nil } -// GetProposerDuties retrieves the proposer duties for the given epoch +// ProposerDuties retrieves the proposer duties for the given epoch func (c beaconApiDutiesProvider) ProposerDuties(ctx context.Context, epoch primitives.Epoch) ([]*structs.ProposerDuty, error) { proposerDuties := structs.GetProposerDutiesResponse{} if err := c.jsonRestHandler.Get(ctx, fmt.Sprintf("/eth/v1/validator/duties/proposer/%d", epoch), &proposerDuties); err != nil { @@ -363,7 +332,7 @@ func (c beaconApiDutiesProvider) ProposerDuties(ctx context.Context, epoch primi return proposerDuties.Data, nil } -// GetSyncDuties retrieves the sync committee duties for the given epoch and validatorIndices +// SyncDuties retrieves the sync committee duties for the given epoch and validatorIndices func (c beaconApiDutiesProvider) SyncDuties(ctx context.Context, epoch primitives.Epoch, validatorIndices []primitives.ValidatorIndex) ([]*structs.SyncCommitteeDuty, error) { jsonValidatorIndices := make([]string, len(validatorIndices)) for index, validatorIndex := range validatorIndices { diff --git a/validator/client/beacon-api/duties_test.go b/validator/client/beacon-api/duties_test.go index 4e696d1bd1..b56ebaf4d3 100644 --- a/validator/client/beacon-api/duties_test.go +++ b/validator/client/beacon-api/duties_test.go @@ -527,8 +527,6 @@ func TestGetDutiesForEpoch_Error(t *testing.T) { fetchProposerDutiesError error generateSyncDuties func() []*structs.SyncCommitteeDuty fetchSyncDutiesError error - generateCommittees func() []*structs.Committee - fetchCommitteesError error }{ { name: "get attester duties failed", @@ -545,11 +543,6 @@ func TestGetDutiesForEpoch_Error(t *testing.T) { expectedError: "failed to get sync duties for epoch `1`: foo error", fetchSyncDutiesError: errors.New("foo error"), }, - { - name: "get committees failed", - expectedError: "failed to get committees for epoch `1`: foo error", - fetchCommitteesError: errors.New("foo error"), - }, { name: "bad attester validator index", expectedError: "failed to parse attester validator index `foo`", @@ -604,46 +597,6 @@ func TestGetDutiesForEpoch_Error(t *testing.T) { return syncDuties }, }, - { - name: "bad committee index", - expectedError: "failed to parse committee index `foo`", - generateCommittees: func() []*structs.Committee { - committees := generateValidCommittees(committeeIndices, committeeSlots, validatorIndices) - committees[0].Index = "foo" - return committees - }, - }, - { - name: "bad committee slot", - expectedError: "failed to parse slot `foo`", - generateCommittees: func() []*structs.Committee { - committees := generateValidCommittees(committeeIndices, committeeSlots, validatorIndices) - committees[0].Slot = "foo" - return committees - }, - }, - { - name: "bad committee validator index", - expectedError: "failed to parse committee validator index `foo`", - generateCommittees: func() []*structs.Committee { - committees := generateValidCommittees(committeeIndices, committeeSlots, validatorIndices) - committees[0].Validators[0] = "foo" - return committees - }, - }, - { - name: "committee index and slot not found in committees mapping", - expectedError: "failed to find validators for committee index `1` and slot `2`", - generateAttesterDuties: func() []*structs.AttesterDuty { - attesterDuties := generateValidAttesterDuties(pubkeys, validatorIndices, committeeIndices, committeeSlots) - attesterDuties[0].CommitteeIndex = "1" - attesterDuties[0].Slot = "2" - return attesterDuties - }, - generateCommittees: func() []*structs.Committee { - return []*structs.Committee{} - }, - }, } for _, testCase := range testCases { @@ -674,13 +627,6 @@ func TestGetDutiesForEpoch_Error(t *testing.T) { syncDuties = testCase.generateSyncDuties() } - var committees []*structs.Committee - if testCase.generateCommittees == nil { - committees = generateValidCommittees(committeeIndices, committeeSlots, validatorIndices) - } else { - committees = testCase.generateCommittees() - } - dutiesProvider := mock.NewMockdutiesProvider(ctrl) dutiesProvider.EXPECT().AttesterDuties( ctx, @@ -708,14 +654,6 @@ func TestGetDutiesForEpoch_Error(t *testing.T) { testCase.fetchSyncDutiesError, ).AnyTimes() - dutiesProvider.EXPECT().Committees( - ctx, - epoch, - ).Return( - committees, - testCase.fetchCommitteesError, - ).AnyTimes() - vals := make([]validatorForDuty, len(pubkeys)) for i := 0; i < len(pubkeys); i++ { vals[i] = validatorForDuty{ @@ -767,13 +705,6 @@ func TestGetDutiesForEpoch_Valid(t *testing.T) { ctx := context.Background() dutiesProvider := mock.NewMockdutiesProvider(ctrl) - dutiesProvider.EXPECT().Committees( - ctx, - epoch, - ).Return( - generateValidCommittees(committeeIndices, committeeSlots, validatorIndices), - nil, - ).Times(1) dutiesProvider.EXPECT().AttesterDuties( ctx, @@ -958,7 +889,14 @@ func TestGetDutiesForEpoch_Valid(t *testing.T) { testCase.fetchSyncDuties, ) require.NoError(t, err) - assert.DeepEqual(t, expectedDuties, duties) + require.Equal(t, len(expectedDuties), len(duties)) + for i, duty := range expectedDuties { + assert.Equal(t, duty.CommitteeIndex, duties[i].CommitteeIndex) + assert.DeepEqual(t, duty.ProposerSlots, duties[i].ProposerSlots) + assert.Equal(t, duty.ValidatorIndex, duties[i].ValidatorIndex) + assert.Equal(t, duty.IsSyncCommittee, duties[i].IsSyncCommittee) + assert.Equal(t, duty.Status, duties[i].Status) + } }) } } @@ -1004,13 +942,6 @@ func TestGetDuties_Valid(t *testing.T) { ctx := context.Background() dutiesProvider := mock.NewMockdutiesProvider(ctrl) - dutiesProvider.EXPECT().Committees( - ctx, - testCase.epoch, - ).Return( - generateValidCommittees(committeeIndices, committeeSlots, validatorIndices), - nil, - ).Times(2) dutiesProvider.EXPECT().AttesterDuties( ctx, @@ -1041,14 +972,6 @@ func TestGetDuties_Valid(t *testing.T) { ).Times(2) } - dutiesProvider.EXPECT().Committees( - ctx, - testCase.epoch+1, - ).Return( - reverseSlice(generateValidCommittees(committeeIndices, committeeSlots, validatorIndices)), - nil, - ).Times(2) - dutiesProvider.EXPECT().AttesterDuties( ctx, testCase.epoch+1, @@ -1209,7 +1132,7 @@ func TestGetDuties_Valid(t *testing.T) { ) require.NoError(t, err) - expectedDuties := ðpb.DutiesResponse{ + expectedDuties := ðpb.ValidatorDutiesContainer{ CurrentEpochDuties: expectedCurrentEpochDuties, NextEpochDuties: expectedNextEpochDuties, } @@ -1298,10 +1221,6 @@ func TestGetDuties_GetDutiesForEpochFailed(t *testing.T) { ctx, gomock.Any(), ).Times(2) - dutiesProvider.EXPECT().Committees( - ctx, - gomock.Any(), - ).Times(2) validatorClient := &beaconApiValidatorClient{ stateValidatorsProvider: stateValidatorsProvider, @@ -1316,72 +1235,61 @@ func TestGetDuties_GetDutiesForEpochFailed(t *testing.T) { assert.ErrorContains(t, "foo error", err) } -func generateValidCommittees(committeeIndices []primitives.CommitteeIndex, slots []primitives.Slot, validatorIndices []primitives.ValidatorIndex) []*structs.Committee { - return []*structs.Committee{ - { - Index: strconv.FormatUint(uint64(committeeIndices[0]), 10), - Slot: strconv.FormatUint(uint64(slots[0]), 10), - Validators: []string{ - strconv.FormatUint(uint64(validatorIndices[0]), 10), - strconv.FormatUint(uint64(validatorIndices[1]), 10), - }, - }, - { - Index: strconv.FormatUint(uint64(committeeIndices[1]), 10), - Slot: strconv.FormatUint(uint64(slots[1]), 10), - Validators: []string{ - strconv.FormatUint(uint64(validatorIndices[2]), 10), - strconv.FormatUint(uint64(validatorIndices[3]), 10), - }, - }, - { - Index: strconv.FormatUint(uint64(committeeIndices[2]), 10), - Slot: strconv.FormatUint(uint64(slots[2]), 10), - Validators: []string{ - strconv.FormatUint(uint64(validatorIndices[4]), 10), - strconv.FormatUint(uint64(validatorIndices[5]), 10), - }, - }, - } -} - func generateValidAttesterDuties(pubkeys [][]byte, validatorIndices []primitives.ValidatorIndex, committeeIndices []primitives.CommitteeIndex, slots []primitives.Slot) []*structs.AttesterDuty { return []*structs.AttesterDuty{ { - Pubkey: hexutil.Encode(pubkeys[0]), - ValidatorIndex: strconv.FormatUint(uint64(validatorIndices[0]), 10), - CommitteeIndex: strconv.FormatUint(uint64(committeeIndices[0]), 10), - Slot: strconv.FormatUint(uint64(slots[0]), 10), + Pubkey: hexutil.Encode(pubkeys[0]), + ValidatorIndex: strconv.FormatUint(uint64(validatorIndices[0]), 10), + CommitteeIndex: strconv.FormatUint(uint64(committeeIndices[0]), 10), + CommitteeLength: fmt.Sprintf("%d", len(committeeIndices)), + ValidatorCommitteeIndex: strconv.FormatUint(uint64(0), 10), + CommitteesAtSlot: strconv.FormatUint(uint64(10), 10), + Slot: strconv.FormatUint(uint64(slots[0]), 10), }, { - Pubkey: hexutil.Encode(pubkeys[1]), - ValidatorIndex: strconv.FormatUint(uint64(validatorIndices[1]), 10), - CommitteeIndex: strconv.FormatUint(uint64(committeeIndices[0]), 10), - Slot: strconv.FormatUint(uint64(slots[0]), 10), + Pubkey: hexutil.Encode(pubkeys[1]), + ValidatorIndex: strconv.FormatUint(uint64(validatorIndices[1]), 10), + CommitteeIndex: strconv.FormatUint(uint64(committeeIndices[0]), 10), + CommitteeLength: fmt.Sprintf("%d", len(committeeIndices)), + ValidatorCommitteeIndex: strconv.FormatUint(uint64(0), 10), + CommitteesAtSlot: strconv.FormatUint(uint64(10), 10), + Slot: strconv.FormatUint(uint64(slots[0]), 10), }, { - Pubkey: hexutil.Encode(pubkeys[2]), - ValidatorIndex: strconv.FormatUint(uint64(validatorIndices[2]), 10), - CommitteeIndex: strconv.FormatUint(uint64(committeeIndices[1]), 10), - Slot: strconv.FormatUint(uint64(slots[1]), 10), + Pubkey: hexutil.Encode(pubkeys[2]), + ValidatorIndex: strconv.FormatUint(uint64(validatorIndices[2]), 10), + CommitteeIndex: strconv.FormatUint(uint64(committeeIndices[1]), 10), + CommitteeLength: fmt.Sprintf("%d", len(committeeIndices)), + ValidatorCommitteeIndex: strconv.FormatUint(uint64(0), 10), + CommitteesAtSlot: strconv.FormatUint(uint64(10), 10), + Slot: strconv.FormatUint(uint64(slots[1]), 10), }, { - Pubkey: hexutil.Encode(pubkeys[3]), - ValidatorIndex: strconv.FormatUint(uint64(validatorIndices[3]), 10), - CommitteeIndex: strconv.FormatUint(uint64(committeeIndices[1]), 10), - Slot: strconv.FormatUint(uint64(slots[1]), 10), + Pubkey: hexutil.Encode(pubkeys[3]), + ValidatorIndex: strconv.FormatUint(uint64(validatorIndices[3]), 10), + CommitteeIndex: strconv.FormatUint(uint64(committeeIndices[1]), 10), + CommitteeLength: fmt.Sprintf("%d", len(committeeIndices)), + ValidatorCommitteeIndex: strconv.FormatUint(uint64(0), 10), + CommitteesAtSlot: strconv.FormatUint(uint64(10), 10), + Slot: strconv.FormatUint(uint64(slots[1]), 10), }, { - Pubkey: hexutil.Encode(pubkeys[4]), - ValidatorIndex: strconv.FormatUint(uint64(validatorIndices[4]), 10), - CommitteeIndex: strconv.FormatUint(uint64(committeeIndices[2]), 10), - Slot: strconv.FormatUint(uint64(slots[2]), 10), + Pubkey: hexutil.Encode(pubkeys[4]), + ValidatorIndex: strconv.FormatUint(uint64(validatorIndices[4]), 10), + CommitteeIndex: strconv.FormatUint(uint64(committeeIndices[2]), 10), + CommitteeLength: fmt.Sprintf("%d", len(committeeIndices)), + ValidatorCommitteeIndex: strconv.FormatUint(uint64(0), 10), + CommitteesAtSlot: strconv.FormatUint(uint64(10), 10), + Slot: strconv.FormatUint(uint64(slots[2]), 10), }, { - Pubkey: hexutil.Encode(pubkeys[5]), - ValidatorIndex: strconv.FormatUint(uint64(validatorIndices[5]), 10), - CommitteeIndex: strconv.FormatUint(uint64(committeeIndices[2]), 10), - Slot: strconv.FormatUint(uint64(slots[2]), 10), + Pubkey: hexutil.Encode(pubkeys[5]), + ValidatorIndex: strconv.FormatUint(uint64(validatorIndices[5]), 10), + CommitteeIndex: strconv.FormatUint(uint64(committeeIndices[2]), 10), + CommitteeLength: fmt.Sprintf("%d", len(committeeIndices)), + ValidatorCommitteeIndex: strconv.FormatUint(uint64(0), 10), + CommitteesAtSlot: strconv.FormatUint(uint64(10), 10), + Slot: strconv.FormatUint(uint64(slots[2]), 10), }, } } diff --git a/validator/client/beacon-api/subscribe_committee_subnets.go b/validator/client/beacon-api/subscribe_committee_subnets.go index aae7168262..cc89e9bb71 100644 --- a/validator/client/beacon-api/subscribe_committee_subnets.go +++ b/validator/client/beacon-api/subscribe_committee_subnets.go @@ -11,7 +11,7 @@ import ( ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" ) -func (c *beaconApiValidatorClient) subscribeCommitteeSubnets(ctx context.Context, in *ethpb.CommitteeSubnetsSubscribeRequest, duties []*ethpb.DutiesResponse_Duty) error { +func (c *beaconApiValidatorClient) subscribeCommitteeSubnets(ctx context.Context, in *ethpb.CommitteeSubnetsSubscribeRequest, duties []*ethpb.ValidatorDuty) error { if in == nil { return errors.New("committee subnets subscribe request is nil") } diff --git a/validator/client/beacon-api/subscribe_committee_subnets_test.go b/validator/client/beacon-api/subscribe_committee_subnets_test.go index 8657b76fb8..f805ced961 100644 --- a/validator/client/beacon-api/subscribe_committee_subnets_test.go +++ b/validator/client/beacon-api/subscribe_committee_subnets_test.go @@ -76,7 +76,7 @@ func TestSubscribeCommitteeSubnets_Valid(t *testing.T) { CommitteeIds: committeeIndices, IsAggregator: isAggregator, }, - []*ethpb.DutiesResponse_Duty{ + []*ethpb.ValidatorDuty{ { ValidatorIndex: validatorIndices[0], CommitteesAtSlot: committeesAtSlot[0], @@ -100,7 +100,7 @@ func TestSubscribeCommitteeSubnets_Error(t *testing.T) { testCases := []struct { name string subscribeRequest *ethpb.CommitteeSubnetsSubscribeRequest - duties []*ethpb.DutiesResponse_Duty + duties []*ethpb.ValidatorDuty expectSubscribeRestCall bool expectedErrorMessage string }{ @@ -116,7 +116,7 @@ func TestSubscribeCommitteeSubnets_Error(t *testing.T) { Slots: []primitives.Slot{1, 2}, IsAggregator: []bool{false, true}, }, - duties: []*ethpb.DutiesResponse_Duty{ + duties: []*ethpb.ValidatorDuty{ { ValidatorIndex: 1, CommitteesAtSlot: 1, @@ -135,7 +135,7 @@ func TestSubscribeCommitteeSubnets_Error(t *testing.T) { Slots: []primitives.Slot{1}, IsAggregator: []bool{false, true}, }, - duties: []*ethpb.DutiesResponse_Duty{ + duties: []*ethpb.ValidatorDuty{ { ValidatorIndex: 1, CommitteesAtSlot: 1, @@ -154,7 +154,7 @@ func TestSubscribeCommitteeSubnets_Error(t *testing.T) { Slots: []primitives.Slot{1, 2}, IsAggregator: []bool{false}, }, - duties: []*ethpb.DutiesResponse_Duty{ + duties: []*ethpb.ValidatorDuty{ { ValidatorIndex: 1, CommitteesAtSlot: 1, @@ -173,7 +173,7 @@ func TestSubscribeCommitteeSubnets_Error(t *testing.T) { Slots: []primitives.Slot{1, 2}, IsAggregator: []bool{false, true}, }, - duties: []*ethpb.DutiesResponse_Duty{ + duties: []*ethpb.ValidatorDuty{ { ValidatorIndex: 1, CommitteesAtSlot: 1, @@ -188,7 +188,7 @@ func TestSubscribeCommitteeSubnets_Error(t *testing.T) { CommitteeIds: []primitives.CommitteeIndex{2}, IsAggregator: []bool{false}, }, - duties: []*ethpb.DutiesResponse_Duty{ + duties: []*ethpb.ValidatorDuty{ { ValidatorIndex: 1, CommitteesAtSlot: 1, diff --git a/validator/client/grpc-api/BUILD.bazel b/validator/client/grpc-api/BUILD.bazel index 3dd77c8311..0225016f29 100644 --- a/validator/client/grpc-api/BUILD.bazel +++ b/validator/client/grpc-api/BUILD.bazel @@ -19,6 +19,7 @@ go_library( "//beacon-chain/state/state-native:go_default_library", "//consensus-types/primitives:go_default_library", "//consensus-types/validator:go_default_library", + "//encoding/bytesutil:go_default_library", "//monitoring/tracing/trace:go_default_library", "//proto/eth/v1:go_default_library", "//proto/prysm/v1alpha1:go_default_library", diff --git a/validator/client/grpc-api/grpc_validator_client.go b/validator/client/grpc-api/grpc_validator_client.go index 4e9cdd4d3c..2b311dc88a 100644 --- a/validator/client/grpc-api/grpc_validator_client.go +++ b/validator/client/grpc-api/grpc_validator_client.go @@ -11,6 +11,7 @@ import ( eventClient "github.com/prysmaticlabs/prysm/v5/api/client/event" "github.com/prysmaticlabs/prysm/v5/api/server/structs" "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives" + "github.com/prysmaticlabs/prysm/v5/encoding/bytesutil" "github.com/prysmaticlabs/prysm/v5/monitoring/tracing/trace" ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/v5/validator/client/iface" @@ -23,8 +24,59 @@ type grpcValidatorClient struct { isEventStreamRunning bool } -func (c *grpcValidatorClient) Duties(ctx context.Context, in *ethpb.DutiesRequest) (*ethpb.DutiesResponse, error) { - return c.beaconNodeValidatorClient.GetDuties(ctx, in) +func (c *grpcValidatorClient) Duties(ctx context.Context, in *ethpb.DutiesRequest) (*ethpb.ValidatorDutiesContainer, error) { + dutiesResponse, err := c.beaconNodeValidatorClient.GetDuties(ctx, in) + if err != nil { + return nil, err + } + return toValidatorDutiesContainer(dutiesResponse) +} + +func toValidatorDutiesContainer(dutiesResponse *ethpb.DutiesResponse) (*ethpb.ValidatorDutiesContainer, error) { + currentDuties := make([]*ethpb.ValidatorDuty, len(dutiesResponse.CurrentEpochDuties)) + for i, cd := range dutiesResponse.CurrentEpochDuties { + duty, err := toValidatorDuty(cd) + if err != nil { + return nil, err + } + currentDuties[i] = duty + } + nextDuties := make([]*ethpb.ValidatorDuty, len(dutiesResponse.NextEpochDuties)) + for i, nd := range dutiesResponse.NextEpochDuties { + duty, err := toValidatorDuty(nd) + if err != nil { + return nil, err + } + nextDuties[i] = duty + } + return ðpb.ValidatorDutiesContainer{ + CurrentEpochDuties: currentDuties, + NextEpochDuties: nextDuties, + }, nil +} + +func toValidatorDuty(duty *ethpb.DutiesResponse_Duty) (*ethpb.ValidatorDuty, error) { + var valIndexInCommittee uint64 + // valIndexInCommittee will be 0 in case we don't get a match. This is a potential false positive, + // however it's an impossible condition because every validator must be assigned to a committee. + for cIndex, vIndex := range duty.Committee { + if vIndex == duty.ValidatorIndex { + valIndexInCommittee = uint64(cIndex) + break + } + } + return ðpb.ValidatorDuty{ + CommitteeLength: uint64(len(duty.Committee)), + CommitteeIndex: duty.CommitteeIndex, + CommitteesAtSlot: duty.CommitteesAtSlot, // GRPC doesn't use this value though + ValidatorCommitteeIndex: valIndexInCommittee, + AttesterSlot: duty.AttesterSlot, + ProposerSlots: duty.ProposerSlots, + PublicKey: bytesutil.SafeCopyBytes(duty.PublicKey), + Status: duty.Status, + ValidatorIndex: duty.ValidatorIndex, + IsSyncCommittee: duty.IsSyncCommittee, + }, nil } func (c *grpcValidatorClient) CheckDoppelGanger(ctx context.Context, in *ethpb.DoppelGangerRequest) (*ethpb.DoppelGangerResponse, error) { @@ -115,7 +167,7 @@ func (c *grpcValidatorClient) SubmitValidatorRegistrations(ctx context.Context, return c.beaconNodeValidatorClient.SubmitValidatorRegistrations(ctx, in) } -func (c *grpcValidatorClient) SubscribeCommitteeSubnets(ctx context.Context, in *ethpb.CommitteeSubnetsSubscribeRequest, _ []*ethpb.DutiesResponse_Duty) (*empty.Empty, error) { +func (c *grpcValidatorClient) SubscribeCommitteeSubnets(ctx context.Context, in *ethpb.CommitteeSubnetsSubscribeRequest, _ []*ethpb.ValidatorDuty) (*empty.Empty, error) { return c.beaconNodeValidatorClient.SubscribeCommitteeSubnets(ctx, in) } diff --git a/validator/client/grpc-api/grpc_validator_client_test.go b/validator/client/grpc-api/grpc_validator_client_test.go index cae411ee1d..d055d4b17c 100644 --- a/validator/client/grpc-api/grpc_validator_client_test.go +++ b/validator/client/grpc-api/grpc_validator_client_test.go @@ -9,6 +9,7 @@ import ( eventClient "github.com/prysmaticlabs/prysm/v5/api/client/event" "github.com/prysmaticlabs/prysm/v5/api/server/structs" + "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives" eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/v5/testing/assert" mock2 "github.com/prysmaticlabs/prysm/v5/testing/mock" @@ -18,6 +19,58 @@ import ( "google.golang.org/protobuf/types/known/emptypb" ) +// toValidatorDutiesContainer is assumed to be available from your package, returning a *v1alpha1.ValidatorDutiesContainer. +func TestToValidatorDutiesContainer_HappyPath(t *testing.T) { + // Create a mock DutiesResponse with current and next duties. + dutiesResp := ð.DutiesResponse{ + CurrentEpochDuties: []*eth.DutiesResponse_Duty{ + { + Committee: []primitives.ValidatorIndex{100, 101}, + CommitteeIndex: 4, + AttesterSlot: 200, + ProposerSlots: []primitives.Slot{400}, + PublicKey: []byte{0xAA, 0xBB}, + Status: eth.ValidatorStatus_ACTIVE, + ValidatorIndex: 101, + IsSyncCommittee: false, + CommitteesAtSlot: 2, + }, + }, + NextEpochDuties: []*eth.DutiesResponse_Duty{ + { + Committee: []primitives.ValidatorIndex{300, 301}, + CommitteeIndex: 8, + AttesterSlot: 600, + ProposerSlots: []primitives.Slot{700, 701}, + PublicKey: []byte{0xCC, 0xDD}, + Status: eth.ValidatorStatus_ACTIVE, + ValidatorIndex: 301, + IsSyncCommittee: true, + CommitteesAtSlot: 3, + }, + }, + } + + gotContainer, err := toValidatorDutiesContainer(dutiesResp) + require.NoError(t, err) + + // Validate we have the correct number of duties in current and next epochs. + assert.Equal(t, len(gotContainer.CurrentEpochDuties), len(dutiesResp.CurrentEpochDuties)) + assert.Equal(t, len(gotContainer.NextEpochDuties), len(dutiesResp.NextEpochDuties)) + + firstCurrentDuty := gotContainer.CurrentEpochDuties[0] + expectedCurrentDuty := dutiesResp.CurrentEpochDuties[0] + assert.DeepEqual(t, firstCurrentDuty.PublicKey, expectedCurrentDuty.PublicKey) + assert.Equal(t, firstCurrentDuty.ValidatorIndex, expectedCurrentDuty.ValidatorIndex) + assert.DeepEqual(t, firstCurrentDuty.ProposerSlots, expectedCurrentDuty.ProposerSlots) + + firstNextDuty := gotContainer.NextEpochDuties[0] + expectedNextDuty := dutiesResp.NextEpochDuties[0] + assert.DeepEqual(t, firstNextDuty.PublicKey, expectedNextDuty.PublicKey) + assert.Equal(t, firstNextDuty.ValidatorIndex, expectedNextDuty.ValidatorIndex) + assert.DeepEqual(t, firstNextDuty.ProposerSlots, expectedNextDuty.ProposerSlots) +} + func TestWaitForChainStart_StreamSetupFails(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() diff --git a/validator/client/iface/validator_client.go b/validator/client/iface/validator_client.go index 5ff21ca648..861b27d378 100644 --- a/validator/client/iface/validator_client.go +++ b/validator/client/iface/validator_client.go @@ -121,7 +121,7 @@ func (s *SyncCommitteeSelection) UnmarshalJSON(input []byte) error { } type ValidatorClient interface { - Duties(ctx context.Context, in *ethpb.DutiesRequest) (*ethpb.DutiesResponse, error) + Duties(ctx context.Context, in *ethpb.DutiesRequest) (*ethpb.ValidatorDutiesContainer, error) DomainData(ctx context.Context, in *ethpb.DomainRequest) (*ethpb.DomainResponse, error) WaitForChainStart(ctx context.Context, in *empty.Empty) (*ethpb.ChainStartResponse, error) ValidatorIndex(ctx context.Context, in *ethpb.ValidatorIndexRequest) (*ethpb.ValidatorIndexResponse, error) @@ -139,7 +139,7 @@ type ValidatorClient interface { SubmitSignedAggregateSelectionProof(ctx context.Context, in *ethpb.SignedAggregateSubmitRequest) (*ethpb.SignedAggregateSubmitResponse, error) SubmitSignedAggregateSelectionProofElectra(ctx context.Context, in *ethpb.SignedAggregateSubmitElectraRequest) (*ethpb.SignedAggregateSubmitResponse, error) ProposeExit(ctx context.Context, in *ethpb.SignedVoluntaryExit) (*ethpb.ProposeExitResponse, error) - SubscribeCommitteeSubnets(ctx context.Context, in *ethpb.CommitteeSubnetsSubscribeRequest, duties []*ethpb.DutiesResponse_Duty) (*empty.Empty, error) + SubscribeCommitteeSubnets(ctx context.Context, in *ethpb.CommitteeSubnetsSubscribeRequest, duties []*ethpb.ValidatorDuty) (*empty.Empty, error) CheckDoppelGanger(ctx context.Context, in *ethpb.DoppelGangerRequest) (*ethpb.DoppelGangerResponse, error) SyncMessageBlockRoot(ctx context.Context, in *empty.Empty) (*ethpb.SyncMessageBlockRootResponse, error) SubmitSyncMessage(ctx context.Context, in *ethpb.SyncCommitteeMessage) (*empty.Empty, error) diff --git a/validator/client/service.go b/validator/client/service.go index 552b002931..f9a1ce3ecd 100644 --- a/validator/client/service.go +++ b/validator/client/service.go @@ -173,6 +173,7 @@ func (v *ValidatorService) Start() { log.WithError(err).Error("No API hosts provided") return } + restHandler := beaconApi.NewBeaconApiJsonRestHandler( http.Client{Timeout: v.conn.GetBeaconApiTimeout(), Transport: otelhttp.NewTransport(http.DefaultTransport)}, hosts[0], diff --git a/validator/client/sync_committee_test.go b/validator/client/sync_committee_test.go index 175c4ee06d..b70d65dc77 100644 --- a/validator/client/sync_committee_test.go +++ b/validator/client/sync_committee_test.go @@ -25,7 +25,7 @@ func TestSubmitSyncCommitteeMessage_ValidatorDutiesRequestFailure(t *testing.T) t.Run(fmt.Sprintf("SlashingProtectionMinimal:%v", isSlashingProtectionMinimal), func(t *testing.T) { hook := logTest.NewGlobal() validator, m, validatorKey, finish := setup(t, isSlashingProtectionMinimal) - validator.duties = ðpb.DutiesResponse{CurrentEpochDuties: []*ethpb.DutiesResponse_Duty{}} + validator.duties = ðpb.ValidatorDutiesContainer{CurrentEpochDuties: []*ethpb.ValidatorDuty{}} defer finish() m.validatorClient.EXPECT().SyncMessageBlockRoot( @@ -51,11 +51,11 @@ func TestSubmitSyncCommitteeMessage_BadDomainData(t *testing.T) { hook := logTest.NewGlobal() validatorIndex := primitives.ValidatorIndex(7) committee := []primitives.ValidatorIndex{0, 3, 4, 2, validatorIndex, 6, 8, 9, 10} - validator.duties = ðpb.DutiesResponse{CurrentEpochDuties: []*ethpb.DutiesResponse_Duty{ + validator.duties = ðpb.ValidatorDutiesContainer{CurrentEpochDuties: []*ethpb.ValidatorDuty{ { - PublicKey: validatorKey.PublicKey().Marshal(), - Committee: committee, - ValidatorIndex: validatorIndex, + PublicKey: validatorKey.PublicKey().Marshal(), + CommitteeLength: uint64(len(committee)), + ValidatorIndex: validatorIndex, }, }} @@ -87,11 +87,11 @@ func TestSubmitSyncCommitteeMessage_CouldNotSubmit(t *testing.T) { hook := logTest.NewGlobal() validatorIndex := primitives.ValidatorIndex(7) committee := []primitives.ValidatorIndex{0, 3, 4, 2, validatorIndex, 6, 8, 9, 10} - validator.duties = ðpb.DutiesResponse{CurrentEpochDuties: []*ethpb.DutiesResponse_Duty{ + validator.duties = ðpb.ValidatorDutiesContainer{CurrentEpochDuties: []*ethpb.ValidatorDuty{ { - PublicKey: validatorKey.PublicKey().Marshal(), - Committee: committee, - ValidatorIndex: validatorIndex, + PublicKey: validatorKey.PublicKey().Marshal(), + CommitteeLength: uint64(len(committee)), + ValidatorIndex: validatorIndex, }, }} @@ -132,11 +132,11 @@ func TestSubmitSyncCommitteeMessage_OK(t *testing.T) { hook := logTest.NewGlobal() validatorIndex := primitives.ValidatorIndex(7) committee := []primitives.ValidatorIndex{0, 3, 4, 2, validatorIndex, 6, 8, 9, 10} - validator.duties = ðpb.DutiesResponse{CurrentEpochDuties: []*ethpb.DutiesResponse_Duty{ + validator.duties = ðpb.ValidatorDutiesContainer{CurrentEpochDuties: []*ethpb.ValidatorDuty{ { - PublicKey: validatorKey.PublicKey().Marshal(), - Committee: committee, - ValidatorIndex: validatorIndex, + PublicKey: validatorKey.PublicKey().Marshal(), + CommitteeLength: uint64(len(committee)), + ValidatorIndex: validatorIndex, }, }} @@ -180,7 +180,7 @@ func TestSubmitSignedContributionAndProof_ValidatorDutiesRequestFailure(t *testi t.Run(fmt.Sprintf("SlashingProtectionMinimal:%v", isSlashingProtectionMinimal), func(t *testing.T) { hook := logTest.NewGlobal() validator, _, validatorKey, finish := setup(t, isSlashingProtectionMinimal) - validator.duties = ðpb.DutiesResponse{CurrentEpochDuties: []*ethpb.DutiesResponse_Duty{}} + validator.duties = ðpb.ValidatorDutiesContainer{CurrentEpochDuties: []*ethpb.ValidatorDuty{}} defer finish() var pubKey [fieldparams.BLSPubkeyLength]byte @@ -198,11 +198,11 @@ func TestSubmitSignedContributionAndProof_SyncSubcommitteeIndexFailure(t *testin validator, m, validatorKey, finish := setup(t, isSlashingProtectionMinimal) validatorIndex := primitives.ValidatorIndex(7) committee := []primitives.ValidatorIndex{0, 3, 4, 2, validatorIndex, 6, 8, 9, 10} - validator.duties = ðpb.DutiesResponse{CurrentEpochDuties: []*ethpb.DutiesResponse_Duty{ + validator.duties = ðpb.ValidatorDutiesContainer{CurrentEpochDuties: []*ethpb.ValidatorDuty{ { - PublicKey: validatorKey.PublicKey().Marshal(), - Committee: committee, - ValidatorIndex: validatorIndex, + PublicKey: validatorKey.PublicKey().Marshal(), + CommitteeLength: uint64(len(committee)), + ValidatorIndex: validatorIndex, }, }} defer finish() @@ -230,11 +230,11 @@ func TestSubmitSignedContributionAndProof_NothingToDo(t *testing.T) { validator, m, validatorKey, finish := setup(t, isSlashingProtectionMinimal) validatorIndex := primitives.ValidatorIndex(7) committee := []primitives.ValidatorIndex{0, 3, 4, 2, validatorIndex, 6, 8, 9, 10} - validator.duties = ðpb.DutiesResponse{CurrentEpochDuties: []*ethpb.DutiesResponse_Duty{ + validator.duties = ðpb.ValidatorDutiesContainer{CurrentEpochDuties: []*ethpb.ValidatorDuty{ { - PublicKey: validatorKey.PublicKey().Marshal(), - Committee: committee, - ValidatorIndex: validatorIndex, + PublicKey: validatorKey.PublicKey().Marshal(), + CommitteeLength: uint64(len(committee)), + ValidatorIndex: validatorIndex, }, }} defer finish() @@ -262,11 +262,11 @@ func TestSubmitSignedContributionAndProof_BadDomain(t *testing.T) { validator, m, validatorKey, finish := setup(t, isSlashingProtectionMinimal) validatorIndex := primitives.ValidatorIndex(7) committee := []primitives.ValidatorIndex{0, 3, 4, 2, validatorIndex, 6, 8, 9, 10} - validator.duties = ðpb.DutiesResponse{CurrentEpochDuties: []*ethpb.DutiesResponse_Duty{ + validator.duties = ðpb.ValidatorDutiesContainer{CurrentEpochDuties: []*ethpb.ValidatorDuty{ { - PublicKey: validatorKey.PublicKey().Marshal(), - Committee: committee, - ValidatorIndex: validatorIndex, + PublicKey: validatorKey.PublicKey().Marshal(), + CommitteeLength: uint64(len(committee)), + ValidatorIndex: validatorIndex, }, }} defer finish() @@ -308,11 +308,11 @@ func TestSubmitSignedContributionAndProof_CouldNotGetContribution(t *testing.T) validator, m, validatorKey, finish := setupWithKey(t, validatorKey, isSlashingProtectionMinimal) validatorIndex := primitives.ValidatorIndex(7) committee := []primitives.ValidatorIndex{0, 3, 4, 2, validatorIndex, 6, 8, 9, 10} - validator.duties = ðpb.DutiesResponse{CurrentEpochDuties: []*ethpb.DutiesResponse_Duty{ + validator.duties = ðpb.ValidatorDutiesContainer{CurrentEpochDuties: []*ethpb.ValidatorDuty{ { - PublicKey: validatorKey.PublicKey().Marshal(), - Committee: committee, - ValidatorIndex: validatorIndex, + PublicKey: validatorKey.PublicKey().Marshal(), + CommitteeLength: uint64(len(committee)), + ValidatorIndex: validatorIndex, }, }} defer finish() @@ -362,11 +362,11 @@ func TestSubmitSignedContributionAndProof_CouldNotSubmitContribution(t *testing. validator, m, validatorKey, finish := setupWithKey(t, validatorKey, isSlashingProtectionMinimal) validatorIndex := primitives.ValidatorIndex(7) committee := []primitives.ValidatorIndex{0, 3, 4, 2, validatorIndex, 6, 8, 9, 10} - validator.duties = ðpb.DutiesResponse{CurrentEpochDuties: []*ethpb.DutiesResponse_Duty{ + validator.duties = ðpb.ValidatorDutiesContainer{CurrentEpochDuties: []*ethpb.ValidatorDuty{ { - PublicKey: validatorKey.PublicKey().Marshal(), - Committee: committee, - ValidatorIndex: validatorIndex, + PublicKey: validatorKey.PublicKey().Marshal(), + CommitteeLength: uint64(len(committee)), + ValidatorIndex: validatorIndex, }, }} defer finish() @@ -444,11 +444,11 @@ func TestSubmitSignedContributionAndProof_Ok(t *testing.T) { validator, m, validatorKey, finish := setupWithKey(t, validatorKey, isSlashingProtectionMinimal) validatorIndex := primitives.ValidatorIndex(7) committee := []primitives.ValidatorIndex{0, 3, 4, 2, validatorIndex, 6, 8, 9, 10} - validator.duties = ðpb.DutiesResponse{CurrentEpochDuties: []*ethpb.DutiesResponse_Duty{ + validator.duties = ðpb.ValidatorDutiesContainer{CurrentEpochDuties: []*ethpb.ValidatorDuty{ { - PublicKey: validatorKey.PublicKey().Marshal(), - Committee: committee, - ValidatorIndex: validatorIndex, + PublicKey: validatorKey.PublicKey().Marshal(), + CommitteeLength: uint64(len(committee)), + ValidatorIndex: validatorIndex, }, }} defer finish() diff --git a/validator/client/validator.go b/validator/client/validator.go index a342b7966d..f6cad923b3 100644 --- a/validator/client/validator.go +++ b/validator/client/validator.go @@ -65,7 +65,7 @@ var ( ) type validator struct { - duties *ethpb.DutiesResponse + duties *ethpb.ValidatorDutiesContainer ticker slots.Ticker genesisTime uint64 highestValidSlot primitives.Slot @@ -595,14 +595,14 @@ func (v *validator) UpdateDuties(ctx context.Context, slot primitives.Slot) erro // subscribeToSubnets iterates through each validator duty, signs each slot, and asks beacon node // to eagerly subscribe to subnets so that the aggregator has attestations to aggregate. -func (v *validator) subscribeToSubnets(ctx context.Context, duties *ethpb.DutiesResponse) error { +func (v *validator) subscribeToSubnets(ctx context.Context, duties *ethpb.ValidatorDutiesContainer) error { ctx, span := trace.StartSpan(ctx, "validator.subscribeToSubnets") defer span.End() subscribeSlots := make([]primitives.Slot, 0, len(duties.CurrentEpochDuties)+len(duties.NextEpochDuties)) subscribeCommitteeIndices := make([]primitives.CommitteeIndex, 0, len(duties.CurrentEpochDuties)+len(duties.NextEpochDuties)) subscribeIsAggregator := make([]bool, 0, len(duties.CurrentEpochDuties)+len(duties.NextEpochDuties)) - activeDuties := make([]*ethpb.DutiesResponse_Duty, 0, len(duties.CurrentEpochDuties)+len(duties.NextEpochDuties)) + activeDuties := make([]*ethpb.ValidatorDuty, 0, len(duties.CurrentEpochDuties)+len(duties.NextEpochDuties)) alreadySubscribed := make(map[[64]byte]bool) if v.distributed { @@ -624,7 +624,7 @@ func (v *validator) subscribeToSubnets(ctx context.Context, duties *ethpb.Duties continue } - aggregator, err := v.isAggregator(ctx, duty.Committee, attesterSlot, pk, validatorIndex) + aggregator, err := v.isAggregator(ctx, duty.CommitteeLength, attesterSlot, pk, validatorIndex) if err != nil { return errors.Wrap(err, "could not check if a validator is an aggregator") } @@ -650,7 +650,7 @@ func (v *validator) subscribeToSubnets(ctx context.Context, duties *ethpb.Duties continue } - aggregator, err := v.isAggregator(ctx, duty.Committee, attesterSlot, bytesutil.ToBytes48(duty.PublicKey), validatorIndex) + aggregator, err := v.isAggregator(ctx, duty.CommitteeLength, attesterSlot, bytesutil.ToBytes48(duty.PublicKey), validatorIndex) if err != nil { return errors.Wrap(err, "could not check if a validator is an aggregator") } @@ -713,7 +713,7 @@ func (v *validator) RolesAt(ctx context.Context, slot primitives.Slot) (map[[fie if duty.AttesterSlot == slot { roles = append(roles, iface.RoleAttester) - aggregator, err := v.isAggregator(ctx, duty.Committee, slot, bytesutil.ToBytes48(duty.PublicKey), duty.ValidatorIndex) + aggregator, err := v.isAggregator(ctx, duty.CommitteeLength, slot, bytesutil.ToBytes48(duty.PublicKey), duty.ValidatorIndex) if err != nil { aggregator = false log.WithError(err).Errorf("Could not check if validator %#x is an aggregator", bytesutil.Trunc(duty.PublicKey)) @@ -792,7 +792,7 @@ func (v *validator) Keymanager() (keymanager.IKeymanager, error) { // it uses a modulo calculated by validator count in committee and samples randomness around it. func (v *validator) isAggregator( ctx context.Context, - committeeIndex []primitives.ValidatorIndex, + committeeLength uint64, slot primitives.Slot, pubKey [fieldparams.BLSPubkeyLength]byte, validatorIndex primitives.ValidatorIndex, @@ -801,8 +801,8 @@ func (v *validator) isAggregator( defer span.End() modulo := uint64(1) - if len(committeeIndex)/int(params.BeaconConfig().TargetAggregatorsPerCommittee) > 1 { - modulo = uint64(len(committeeIndex)) / params.BeaconConfig().TargetAggregatorsPerCommittee + if committeeLength/params.BeaconConfig().TargetAggregatorsPerCommittee > 1 { + modulo = committeeLength / params.BeaconConfig().TargetAggregatorsPerCommittee } var ( @@ -955,7 +955,7 @@ func (v *validator) domainData(ctx context.Context, epoch primitives.Epoch, doma return res, nil } -func (v *validator) logDuties(slot primitives.Slot, currentEpochDuties []*ethpb.DutiesResponse_Duty, nextEpochDuties []*ethpb.DutiesResponse_Duty) { +func (v *validator) logDuties(slot primitives.Slot, currentEpochDuties []*ethpb.ValidatorDuty, nextEpochDuties []*ethpb.ValidatorDuty) { attesterKeys := make([][]string, params.BeaconConfig().SlotsPerEpoch) for i := range attesterKeys { attesterKeys[i] = make([]string, 0) @@ -1391,7 +1391,7 @@ func (v *validator) buildSignedRegReqs( return signedValRegRequests } -func (v *validator) aggregatedSelectionProofs(ctx context.Context, duties *ethpb.DutiesResponse) error { +func (v *validator) aggregatedSelectionProofs(ctx context.Context, duties *ethpb.ValidatorDutiesContainer) error { ctx, span := trace.StartSpan(ctx, "validator.aggregatedSelectionProofs") defer span.End() diff --git a/validator/client/validator_test.go b/validator/client/validator_test.go index 9217347d9a..a3adc64221 100644 --- a/validator/client/validator_test.go +++ b/validator/client/validator_test.go @@ -394,10 +394,9 @@ func TestUpdateDuties_DoesNothingWhenNotEpochStart_AlreadyExistingAssignments(t slot := primitives.Slot(1) v := validator{ validatorClient: client, - duties: ðpb.DutiesResponse{ - CurrentEpochDuties: []*ethpb.DutiesResponse_Duty{ + duties: ðpb.ValidatorDutiesContainer{ + CurrentEpochDuties: []*ethpb.ValidatorDuty{ { - Committee: []primitives.ValidatorIndex{}, AttesterSlot: 10, CommitteeIndex: 20, }, @@ -420,8 +419,8 @@ func TestUpdateDuties_ReturnsError(t *testing.T) { v := validator{ validatorClient: client, km: newMockKeymanager(t, randKeypair(t)), - duties: ðpb.DutiesResponse{ - CurrentEpochDuties: []*ethpb.DutiesResponse_Duty{ + duties: ðpb.ValidatorDutiesContainer{ + CurrentEpochDuties: []*ethpb.ValidatorDuty{ { CommitteeIndex: 1, }, @@ -437,7 +436,7 @@ func TestUpdateDuties_ReturnsError(t *testing.T) { ).Return(nil, expected) assert.ErrorContains(t, expected.Error(), v.UpdateDuties(context.Background(), params.BeaconConfig().SlotsPerEpoch)) - assert.Equal(t, (*ethpb.DutiesResponse)(nil), v.duties, "Assignments should have been cleared on failure") + assert.Equal(t, (*ethpb.ValidatorDutiesContainer)(nil), v.duties, "Assignments should have been cleared on failure") } func TestUpdateDuties_OK(t *testing.T) { @@ -446,15 +445,15 @@ func TestUpdateDuties_OK(t *testing.T) { client := validatormock.NewMockValidatorClient(ctrl) slot := params.BeaconConfig().SlotsPerEpoch - resp := ðpb.DutiesResponse{ - CurrentEpochDuties: []*ethpb.DutiesResponse_Duty{ + resp := ðpb.ValidatorDutiesContainer{ + CurrentEpochDuties: []*ethpb.ValidatorDuty{ { - AttesterSlot: params.BeaconConfig().SlotsPerEpoch, - ValidatorIndex: 200, - CommitteeIndex: 100, - Committee: []primitives.ValidatorIndex{0, 1, 2, 3}, - PublicKey: []byte("testPubKey_1"), - ProposerSlots: []primitives.Slot{params.BeaconConfig().SlotsPerEpoch + 1}, + AttesterSlot: params.BeaconConfig().SlotsPerEpoch, + ValidatorIndex: 200, + CommitteeIndex: 100, + CommitteeLength: 4, + PublicKey: []byte("testPubKey_1"), + ProposerSlots: []primitives.Slot{params.BeaconConfig().SlotsPerEpoch + 1}, }, }, } @@ -474,7 +473,7 @@ func TestUpdateDuties_OK(t *testing.T) { gomock.Any(), gomock.Any(), gomock.Any(), - ).DoAndReturn(func(_ context.Context, _ *ethpb.CommitteeSubnetsSubscribeRequest, _ []*ethpb.DutiesResponse_Duty) (*emptypb.Empty, error) { + ).DoAndReturn(func(_ context.Context, _ *ethpb.CommitteeSubnetsSubscribeRequest, _ []*ethpb.ValidatorDuty) (*emptypb.Empty, error) { wg.Done() return nil, nil }) @@ -508,8 +507,8 @@ func TestUpdateDuties_OK_FilterBlacklistedPublicKeys(t *testing.T) { blacklistedPubkeys: blacklistedPublicKeys, } - resp := ðpb.DutiesResponse{ - CurrentEpochDuties: []*ethpb.DutiesResponse_Duty{}, + resp := ðpb.ValidatorDutiesContainer{ + CurrentEpochDuties: []*ethpb.ValidatorDuty{}, } client.EXPECT().Duties( gomock.Any(), @@ -522,7 +521,7 @@ func TestUpdateDuties_OK_FilterBlacklistedPublicKeys(t *testing.T) { gomock.Any(), gomock.Any(), gomock.Any(), - ).DoAndReturn(func(_ context.Context, _ *ethpb.CommitteeSubnetsSubscribeRequest, _ []*ethpb.DutiesResponse_Duty) (*emptypb.Empty, error) { + ).DoAndReturn(func(_ context.Context, _ *ethpb.CommitteeSubnetsSubscribeRequest, _ []*ethpb.ValidatorDuty) (*emptypb.Empty, error) { wg.Done() return nil, nil }) @@ -542,25 +541,25 @@ func TestUpdateDuties_AllValidatorsExited(t *testing.T) { client := validatormock.NewMockValidatorClient(ctrl) slot := params.BeaconConfig().SlotsPerEpoch - resp := ðpb.DutiesResponse{ - CurrentEpochDuties: []*ethpb.DutiesResponse_Duty{ + resp := ðpb.ValidatorDutiesContainer{ + CurrentEpochDuties: []*ethpb.ValidatorDuty{ { - AttesterSlot: params.BeaconConfig().SlotsPerEpoch, - ValidatorIndex: 200, - CommitteeIndex: 100, - Committee: []primitives.ValidatorIndex{0, 1, 2, 3}, - PublicKey: []byte("testPubKey_1"), - ProposerSlots: []primitives.Slot{params.BeaconConfig().SlotsPerEpoch + 1}, - Status: ethpb.ValidatorStatus_EXITED, + AttesterSlot: params.BeaconConfig().SlotsPerEpoch, + ValidatorIndex: 200, + CommitteeIndex: 100, + CommitteeLength: 4, + PublicKey: []byte("testPubKey_1"), + ProposerSlots: []primitives.Slot{params.BeaconConfig().SlotsPerEpoch + 1}, + Status: ethpb.ValidatorStatus_EXITED, }, { - AttesterSlot: params.BeaconConfig().SlotsPerEpoch, - ValidatorIndex: 201, - CommitteeIndex: 101, - Committee: []primitives.ValidatorIndex{0, 1, 2, 3}, - PublicKey: []byte("testPubKey_2"), - ProposerSlots: []primitives.Slot{params.BeaconConfig().SlotsPerEpoch + 1}, - Status: ethpb.ValidatorStatus_EXITED, + AttesterSlot: params.BeaconConfig().SlotsPerEpoch, + ValidatorIndex: 201, + CommitteeIndex: 101, + CommitteeLength: 4, + PublicKey: []byte("testPubKey_2"), + ProposerSlots: []primitives.Slot{params.BeaconConfig().SlotsPerEpoch + 1}, + Status: ethpb.ValidatorStatus_EXITED, }, }, } @@ -586,8 +585,8 @@ func TestUpdateDuties_Distributed(t *testing.T) { // Start of third epoch. slot := 2 * params.BeaconConfig().SlotsPerEpoch keys := randKeypair(t) - resp := ðpb.DutiesResponse{ - CurrentEpochDuties: []*ethpb.DutiesResponse_Duty{ + resp := ðpb.ValidatorDutiesContainer{ + CurrentEpochDuties: []*ethpb.ValidatorDuty{ { AttesterSlot: slot, // First slot in epoch. ValidatorIndex: 200, @@ -596,7 +595,7 @@ func TestUpdateDuties_Distributed(t *testing.T) { Status: ethpb.ValidatorStatus_ACTIVE, }, }, - NextEpochDuties: []*ethpb.DutiesResponse_Duty{ + NextEpochDuties: []*ethpb.ValidatorDuty{ { AttesterSlot: slot + params.BeaconConfig().SlotsPerEpoch, // First slot in next epoch. ValidatorIndex: 200, @@ -654,7 +653,7 @@ func TestUpdateDuties_Distributed(t *testing.T) { gomock.Any(), gomock.Any(), gomock.Any(), - ).DoAndReturn(func(_ context.Context, _ *ethpb.CommitteeSubnetsSubscribeRequest, _ []*ethpb.DutiesResponse_Duty) (*emptypb.Empty, error) { + ).DoAndReturn(func(_ context.Context, _ *ethpb.CommitteeSubnetsSubscribeRequest, _ []*ethpb.ValidatorDuty) (*emptypb.Empty, error) { wg.Done() return nil, nil }) @@ -670,8 +669,8 @@ func TestRolesAt_OK(t *testing.T) { v, m, validatorKey, finish := setup(t, isSlashingProtectionMinimal) defer finish() - v.duties = ðpb.DutiesResponse{ - CurrentEpochDuties: []*ethpb.DutiesResponse_Duty{ + v.duties = ðpb.ValidatorDutiesContainer{ + CurrentEpochDuties: []*ethpb.ValidatorDuty{ { CommitteeIndex: 1, AttesterSlot: 1, @@ -679,7 +678,7 @@ func TestRolesAt_OK(t *testing.T) { IsSyncCommittee: true, }, }, - NextEpochDuties: []*ethpb.DutiesResponse_Duty{ + NextEpochDuties: []*ethpb.ValidatorDuty{ { CommitteeIndex: 1, AttesterSlot: 1, @@ -710,8 +709,8 @@ func TestRolesAt_OK(t *testing.T) { assert.Equal(t, iface.RoleSyncCommittee, roleMap[bytesutil.ToBytes48(validatorKey.PublicKey().Marshal())][2]) // Test sync committee role at epoch boundary. - v.duties = ðpb.DutiesResponse{ - CurrentEpochDuties: []*ethpb.DutiesResponse_Duty{ + v.duties = ðpb.ValidatorDutiesContainer{ + CurrentEpochDuties: []*ethpb.ValidatorDuty{ { CommitteeIndex: 1, AttesterSlot: 1, @@ -719,7 +718,7 @@ func TestRolesAt_OK(t *testing.T) { IsSyncCommittee: false, }, }, - NextEpochDuties: []*ethpb.DutiesResponse_Duty{ + NextEpochDuties: []*ethpb.ValidatorDuty{ { CommitteeIndex: 1, AttesterSlot: 1, @@ -750,8 +749,8 @@ func TestRolesAt_DoesNotAssignProposer_Slot0(t *testing.T) { v, m, validatorKey, finish := setup(t, isSlashingProtectionMinimal) defer finish() - v.duties = ðpb.DutiesResponse{ - CurrentEpochDuties: []*ethpb.DutiesResponse_Duty{ + v.duties = ðpb.ValidatorDutiesContainer{ + CurrentEpochDuties: []*ethpb.ValidatorDuty{ { CommitteeIndex: 1, AttesterSlot: 0, @@ -865,8 +864,8 @@ func TestCheckAndLogValidatorStatus_OK(t *testing.T) { client := validatormock.NewMockValidatorClient(ctrl) v := validator{ validatorClient: client, - duties: ðpb.DutiesResponse{ - CurrentEpochDuties: []*ethpb.DutiesResponse_Duty{ + duties: ðpb.ValidatorDutiesContainer{ + CurrentEpochDuties: []*ethpb.ValidatorDuty{ { CommitteeIndex: 1, },