Register sync subnet when fetching sync committee duties through Beacon API (#12972)

* Register sync subnet when fetching sync committee duties through Beacon API

* review

* oops

---------

Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
This commit is contained in:
Radosław Kapka
2023-10-16 19:20:27 +02:00
committed by GitHub
parent 2f378a045a
commit b52baba2f1
7 changed files with 247 additions and 107 deletions

View File

@@ -1,4 +1,4 @@
load("@prysm//tools/go:def.bzl", "go_library")
load("@prysm//tools/go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
@@ -22,6 +22,7 @@ go_library(
"//beacon-chain/core/transition:go_default_library",
"//beacon-chain/operations/synccommittee:go_default_library",
"//beacon-chain/p2p:go_default_library",
"//beacon-chain/state:go_default_library",
"//beacon-chain/state/stategen:go_default_library",
"//beacon-chain/sync:go_default_library",
"//config/fieldparams:go_default_library",
@@ -41,3 +42,17 @@ go_library(
"@org_golang_x_sync//errgroup:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["validator_test.go"],
embed = [":go_default_library"],
deps = [
"//beacon-chain/cache:go_default_library",
"//config/params:go_default_library",
"//consensus-types/validator:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//testing/assert:go_default_library",
"//testing/require:go_default_library",
],
)

View File

@@ -16,6 +16,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers"
coreTime "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/time"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/transition"
beaconState "github.com/prysmaticlabs/prysm/v4/beacon-chain/state"
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
@@ -486,3 +487,122 @@ func (s *Service) SubmitSyncMessage(ctx context.Context, msg *ethpb.SyncCommitte
}
return nil
}
// RegisterSyncSubnetCurrentPeriod registers a persistent subnet for the current sync committee period.
func RegisterSyncSubnetCurrentPeriod(s beaconState.BeaconState, epoch primitives.Epoch, pubKey []byte, status validator.ValidatorStatus) error {
committee, err := s.CurrentSyncCommittee()
if err != nil {
return err
}
syncCommPeriod := slots.SyncCommitteePeriod(epoch)
registerSyncSubnet(epoch, syncCommPeriod, pubKey, committee, status)
return nil
}
// RegisterSyncSubnetCurrentPeriodProto registers a persistent subnet for the current sync committee period.
func RegisterSyncSubnetCurrentPeriodProto(s beaconState.BeaconState, epoch primitives.Epoch, pubKey []byte, status ethpb.ValidatorStatus) error {
committee, err := s.CurrentSyncCommittee()
if err != nil {
return err
}
syncCommPeriod := slots.SyncCommitteePeriod(epoch)
registerSyncSubnetProto(epoch, syncCommPeriod, pubKey, committee, status)
return nil
}
// RegisterSyncSubnetNextPeriod registers a persistent subnet for the next sync committee period.
func RegisterSyncSubnetNextPeriod(s beaconState.BeaconState, epoch primitives.Epoch, pubKey []byte, status validator.ValidatorStatus) error {
committee, err := s.NextSyncCommittee()
if err != nil {
return err
}
syncCommPeriod := slots.SyncCommitteePeriod(epoch)
registerSyncSubnet(epoch, syncCommPeriod+1, pubKey, committee, status)
return nil
}
// RegisterSyncSubnetNextPeriodProto registers a persistent subnet for the next sync committee period.
func RegisterSyncSubnetNextPeriodProto(s beaconState.BeaconState, epoch primitives.Epoch, pubKey []byte, status ethpb.ValidatorStatus) error {
committee, err := s.NextSyncCommittee()
if err != nil {
return err
}
syncCommPeriod := slots.SyncCommitteePeriod(epoch)
registerSyncSubnetProto(epoch, syncCommPeriod+1, pubKey, committee, status)
return nil
}
// registerSyncSubnet checks the status and pubkey of a particular validator
// to discern whether persistent subnets need to be registered for them.
func registerSyncSubnet(
currEpoch primitives.Epoch,
syncPeriod uint64,
pubkey []byte,
syncCommittee *ethpb.SyncCommittee,
status validator.ValidatorStatus,
) {
if status != validator.Active && status != validator.ActiveExiting {
return
}
registerSyncSubnetInternal(currEpoch, syncPeriod, pubkey, syncCommittee)
}
func registerSyncSubnetProto(
currEpoch primitives.Epoch,
syncPeriod uint64,
pubkey []byte,
syncCommittee *ethpb.SyncCommittee,
status ethpb.ValidatorStatus,
) {
if status != ethpb.ValidatorStatus_ACTIVE && status != ethpb.ValidatorStatus_EXITING {
return
}
registerSyncSubnetInternal(currEpoch, syncPeriod, pubkey, syncCommittee)
}
func registerSyncSubnetInternal(
currEpoch primitives.Epoch,
syncPeriod uint64,
pubkey []byte,
syncCommittee *ethpb.SyncCommittee,
) {
startEpoch := primitives.Epoch(syncPeriod * uint64(params.BeaconConfig().EpochsPerSyncCommitteePeriod))
currPeriod := slots.SyncCommitteePeriod(currEpoch)
endEpoch := startEpoch + params.BeaconConfig().EpochsPerSyncCommitteePeriod
_, _, ok, expTime := cache.SyncSubnetIDs.GetSyncCommitteeSubnets(pubkey, startEpoch)
if ok && expTime.After(prysmTime.Now()) {
return
}
firstValidEpoch, err := startEpoch.SafeSub(params.BeaconConfig().SyncCommitteeSubnetCount)
if err != nil {
firstValidEpoch = 0
}
// If we are processing for a future period, we only
// add to the relevant subscription once we are at the valid
// bound.
if syncPeriod != currPeriod && currEpoch < firstValidEpoch {
return
}
subs := subnetsFromCommittee(pubkey, syncCommittee)
// Handle overflow in the event current epoch is less
// than end epoch. This is an impossible condition, so
// it is a defensive check.
epochsToWatch, err := endEpoch.SafeSub(uint64(currEpoch))
if err != nil {
epochsToWatch = 0
}
epochDuration := time.Duration(params.BeaconConfig().SlotsPerEpoch.Mul(params.BeaconConfig().SecondsPerSlot))
totalDuration := epochDuration * time.Duration(epochsToWatch) * time.Second
cache.SyncSubnetIDs.AddSyncCommitteeSubnets(pubkey, startEpoch, subs, totalDuration)
}
// subnetsFromCommittee retrieves the relevant subnets for the chosen validator.
func subnetsFromCommittee(pubkey []byte, comm *ethpb.SyncCommittee) []uint64 {
positions := make([]uint64, 0)
for i, pkey := range comm.Pubkeys {
if bytes.Equal(pubkey, pkey) {
positions = append(positions, uint64(i)/(params.BeaconConfig().SyncCommitteeSize/params.BeaconConfig().SyncCommitteeSubnetCount))
}
}
return positions
}

View File

@@ -0,0 +1,65 @@
package core
import (
"encoding/binary"
"testing"
"time"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/cache"
"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/consensus-types/validator"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v4/testing/assert"
"github.com/prysmaticlabs/prysm/v4/testing/require"
)
func TestRegisterSyncSubnetProto(t *testing.T) {
k := pubKey(3)
committee := make([][]byte, 0)
for i := 0; i < 100; i++ {
committee = append(committee, pubKey(uint64(i)))
}
sCommittee := &ethpb.SyncCommittee{
Pubkeys: committee,
}
registerSyncSubnetProto(0, 0, k, sCommittee, ethpb.ValidatorStatus_ACTIVE)
coms, _, ok, exp := cache.SyncSubnetIDs.GetSyncCommitteeSubnets(k, 0)
require.Equal(t, true, ok, "No cache entry found for validator")
assert.Equal(t, uint64(1), uint64(len(coms)))
epochDuration := time.Duration(params.BeaconConfig().SlotsPerEpoch.Mul(params.BeaconConfig().SecondsPerSlot))
totalTime := time.Duration(params.BeaconConfig().EpochsPerSyncCommitteePeriod) * epochDuration * time.Second
receivedTime := time.Until(exp.Round(time.Second)).Round(time.Second)
if receivedTime < totalTime {
t.Fatalf("Expiration time of %f was less than expected duration of %f ", receivedTime.Seconds(), totalTime.Seconds())
}
}
func TestRegisterSyncSubnet(t *testing.T) {
k := pubKey(3)
committee := make([][]byte, 0)
for i := 0; i < 100; i++ {
committee = append(committee, pubKey(uint64(i)))
}
sCommittee := &ethpb.SyncCommittee{
Pubkeys: committee,
}
registerSyncSubnet(0, 0, k, sCommittee, validator.Active)
coms, _, ok, exp := cache.SyncSubnetIDs.GetSyncCommitteeSubnets(k, 0)
require.Equal(t, true, ok, "No cache entry found for validator")
assert.Equal(t, uint64(1), uint64(len(coms)))
epochDuration := time.Duration(params.BeaconConfig().SlotsPerEpoch.Mul(params.BeaconConfig().SecondsPerSlot))
totalTime := time.Duration(params.BeaconConfig().EpochsPerSyncCommitteePeriod) * epochDuration * time.Second
receivedTime := time.Until(exp.Round(time.Second)).Round(time.Second)
if receivedTime < totalTime {
t.Fatalf("Expiration time of %f was less than expected duration of %f ", receivedTime.Seconds(), totalTime.Seconds())
}
}
// pubKey is a helper to generate a well-formed public key.
func pubKey(i uint64) []byte {
pubKey := make([]byte, params.BeaconConfig().BLSPubkeyLength)
binary.LittleEndian.PutUint64(pubKey, i)
return pubKey
}

View File

@@ -943,15 +943,16 @@ func (s *Server) GetSyncCommitteeDuties(w http.ResponseWriter, r *http.Request)
return
}
nextSyncCommitteeFirstEpoch := currentSyncCommitteeFirstEpoch + params.BeaconConfig().EpochsPerSyncCommitteePeriod
isCurrentCommitteeRequested := requestedEpoch < nextSyncCommitteeFirstEpoch
var committee *ethpbalpha.SyncCommittee
if requestedEpoch >= nextSyncCommitteeFirstEpoch {
committee, err = st.NextSyncCommittee()
if isCurrentCommitteeRequested {
committee, err = st.CurrentSyncCommittee()
if err != nil {
http2.HandleError(w, "Could not get sync committee: "+err.Error(), http.StatusInternalServerError)
return
}
} else {
committee, err = st.CurrentSyncCommittee()
committee, err = st.NextSyncCommittee()
if err != nil {
http2.HandleError(w, "Could not get sync committee: "+err.Error(), http.StatusInternalServerError)
return
@@ -962,12 +963,31 @@ func (s *Server) GetSyncCommitteeDuties(w http.ResponseWriter, r *http.Request)
pubkey48 := bytesutil.ToBytes48(pubkey)
committeePubkeys[pubkey48] = append(committeePubkeys[pubkey48], strconv.FormatUint(uint64(j), 10))
}
duties, err := syncCommitteeDuties(requestedValIndices, st, committeePubkeys)
duties, vals, err := syncCommitteeDutiesAndVals(st, requestedValIndices, committeePubkeys)
if err != nil {
http2.HandleError(w, err.Error(), http.StatusBadRequest)
return
}
var registerSyncSubnet func(state.BeaconState, primitives.Epoch, []byte, validator2.ValidatorStatus) error
if isCurrentCommitteeRequested {
registerSyncSubnet = core.RegisterSyncSubnetCurrentPeriod
} else {
registerSyncSubnet = core.RegisterSyncSubnetNextPeriod
}
for _, v := range vals {
pk := v.PublicKey()
valStatus, err := rpchelpers.ValidatorStatus(v, requestedEpoch)
if err != nil {
http2.HandleError(w, "Could not get validator status: "+err.Error(), http.StatusInternalServerError)
return
}
if err := registerSyncSubnet(st, requestedEpoch, pk[:], valStatus); err != nil {
http2.HandleError(w, fmt.Sprintf("Could not register sync subnet for pubkey %#x", pk), http.StatusInternalServerError)
return
}
}
isOptimistic, err := s.OptimisticModeFetcher.IsOptimistic(ctx)
if err != nil {
http2.HandleError(w, "Could not check optimistic status: "+err.Error(), http.StatusInternalServerError)
@@ -1134,29 +1154,38 @@ func syncCommitteeDutiesLastValidEpoch(currentEpoch primitives.Epoch) primitives
return (currentSyncPeriodIndex+2)*params.BeaconConfig().EpochsPerSyncCommitteePeriod - 1
}
func syncCommitteeDuties(
valIndices []primitives.ValidatorIndex,
// syncCommitteeDutiesAndVals takes a list of requested validator indices and the actual sync committee pubkeys.
// It returns duties for the validator indices that are part of the sync committee.
// Additionally, it returns read-only validator objects for these validator indices.
func syncCommitteeDutiesAndVals(
st state.BeaconState,
requestedValIndices []primitives.ValidatorIndex,
committeePubkeys map[[fieldparams.BLSPubkeyLength]byte][]string,
) ([]*SyncCommitteeDuty, error) {
) ([]*SyncCommitteeDuty, []state.ReadOnlyValidator, error) {
duties := make([]*SyncCommitteeDuty, 0)
for _, index := range valIndices {
vals := make([]state.ReadOnlyValidator, 0)
for _, index := range requestedValIndices {
duty := &SyncCommitteeDuty{
ValidatorIndex: strconv.FormatUint(uint64(index), 10),
}
valPubkey := st.PubkeyAtIndex(index)
var zeroPubkey [fieldparams.BLSPubkeyLength]byte
if bytes.Equal(valPubkey[:], zeroPubkey[:]) {
return nil, errors.Errorf("Invalid validator index %d", index)
return nil, nil, errors.Errorf("Invalid validator index %d", index)
}
duty.Pubkey = hexutil.Encode(valPubkey[:])
indices, ok := committeePubkeys[valPubkey]
if ok {
duty.ValidatorSyncCommitteeIndices = indices
duties = append(duties, duty)
v, err := st.ValidatorAtIndexReadOnly(index)
if err != nil {
return nil, nil, fmt.Errorf("could not get validator at index %d", index)
}
vals = append(vals, v)
}
}
return duties, nil
return duties, vals, nil
}
func sortProposerDuties(w http.ResponseWriter, duties []*ProposerDuty) bool {

View File

@@ -2079,6 +2079,8 @@ func TestGetSyncCommitteeDuties(t *testing.T) {
}
t.Run("single validator", func(t *testing.T) {
cache.SyncSubnetIDs.EmptyAllCaches()
var body bytes.Buffer
_, err := body.WriteString("[\"1\"]")
require.NoError(t, err)
@@ -2097,6 +2099,9 @@ func TestGetSyncCommitteeDuties(t *testing.T) {
assert.Equal(t, "1", duty.ValidatorIndex)
require.Equal(t, 1, len(duty.ValidatorSyncCommitteeIndices))
assert.Equal(t, "1", duty.ValidatorSyncCommitteeIndices[0])
subnetId, _, ok, _ := cache.SyncSubnetIDs.GetSyncCommitteeSubnets(vals[1].PublicKey, 0)
require.Equal(t, true, ok)
assert.Equal(t, 1, len(subnetId))
})
t.Run("multiple validators", func(t *testing.T) {
var body bytes.Buffer

View File

@@ -1,18 +1,14 @@
package validator
import (
"bytes"
"context"
"time"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/cache"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed"
statefeed "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers"
coreTime "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/time"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/transition"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/core"
beaconState "github.com/prysmaticlabs/prysm/v4/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
@@ -205,7 +201,7 @@ func (vs *Server) duties(ctx context.Context, req *ethpb.DutiesRequest) (*ethpb.
return nil, status.Errorf(codes.Internal, "Could not determine current epoch sync committee: %v", err)
}
if assignment.IsSyncCommittee {
if err := registerSyncSubnetCurrentPeriod(s, req.Epoch, pubKey, assignment.Status); err != nil {
if err := core.RegisterSyncSubnetCurrentPeriodProto(s, req.Epoch, pubKey, assignment.Status); err != nil {
return nil, err
}
}
@@ -222,7 +218,7 @@ func (vs *Server) duties(ctx context.Context, req *ethpb.DutiesRequest) (*ethpb.
return nil, status.Errorf(codes.Internal, "Could not determine next epoch sync committee: %v", err)
}
if nextAssignment.IsSyncCommittee {
if err := registerSyncSubnetNextPeriod(s, req.Epoch, pubKey, nextAssignment.Status); err != nil {
if err := core.RegisterSyncSubnetNextPeriodProto(s, req.Epoch, pubKey, nextAssignment.Status); err != nil {
return nil, err
}
}
@@ -251,71 +247,3 @@ func (vs *Server) AssignValidatorToSubnet(_ context.Context, req *ethpb.AssignVa
core.AssignValidatorToSubnetProto(req.PublicKey, req.Status)
return &emptypb.Empty{}, nil
}
func registerSyncSubnetCurrentPeriod(s beaconState.BeaconState, epoch primitives.Epoch, pubKey []byte, status ethpb.ValidatorStatus) error {
committee, err := s.CurrentSyncCommittee()
if err != nil {
return err
}
syncCommPeriod := slots.SyncCommitteePeriod(epoch)
registerSyncSubnet(epoch, syncCommPeriod, pubKey, committee, status)
return nil
}
func registerSyncSubnetNextPeriod(s beaconState.BeaconState, epoch primitives.Epoch, pubKey []byte, status ethpb.ValidatorStatus) error {
committee, err := s.NextSyncCommittee()
if err != nil {
return err
}
syncCommPeriod := slots.SyncCommitteePeriod(epoch)
registerSyncSubnet(epoch, syncCommPeriod+1, pubKey, committee, status)
return nil
}
// registerSyncSubnet checks the status and pubkey of a particular validator
// to discern whether persistent subnets need to be registered for them.
func registerSyncSubnet(currEpoch primitives.Epoch, syncPeriod uint64, pubkey []byte,
syncCommittee *ethpb.SyncCommittee, status ethpb.ValidatorStatus) {
if status != ethpb.ValidatorStatus_ACTIVE && status != ethpb.ValidatorStatus_EXITING {
return
}
startEpoch := primitives.Epoch(syncPeriod * uint64(params.BeaconConfig().EpochsPerSyncCommitteePeriod))
currPeriod := slots.SyncCommitteePeriod(currEpoch)
endEpoch := startEpoch + params.BeaconConfig().EpochsPerSyncCommitteePeriod
_, _, ok, expTime := cache.SyncSubnetIDs.GetSyncCommitteeSubnets(pubkey, startEpoch)
if ok && expTime.After(prysmTime.Now()) {
return
}
firstValidEpoch, err := startEpoch.SafeSub(params.BeaconConfig().SyncCommitteeSubnetCount)
if err != nil {
firstValidEpoch = 0
}
// If we are processing for a future period, we only
// add to the relevant subscription once we are at the valid
// bound.
if syncPeriod != currPeriod && currEpoch < firstValidEpoch {
return
}
subs := subnetsFromCommittee(pubkey, syncCommittee)
// Handle overflow in the event current epoch is less
// than end epoch. This is an impossible condition, so
// it is a defensive check.
epochsToWatch, err := endEpoch.SafeSub(uint64(currEpoch))
if err != nil {
epochsToWatch = 0
}
epochDuration := time.Duration(params.BeaconConfig().SlotsPerEpoch.Mul(params.BeaconConfig().SecondsPerSlot))
totalDuration := epochDuration * time.Duration(epochsToWatch) * time.Second
cache.SyncSubnetIDs.AddSyncCommitteeSubnets(pubkey, startEpoch, subs, totalDuration)
}
// subnetsFromCommittee retrieves the relevant subnets for the chosen validator.
func subnetsFromCommittee(pubkey []byte, comm *ethpb.SyncCommittee) []uint64 {
positions := make([]uint64, 0)
for i, pkey := range comm.Pubkeys {
if bytes.Equal(pubkey, pkey) {
positions = append(positions, uint64(i)/(params.BeaconConfig().SyncCommitteeSize/params.BeaconConfig().SyncCommitteeSubnetCount))
}
}
return positions
}

View File

@@ -619,28 +619,6 @@ func TestAssignValidatorToSubnet(t *testing.T) {
}
}
func TestAssignValidatorToSyncSubnet(t *testing.T) {
k := pubKey(3)
committee := make([][]byte, 0)
for i := 0; i < 100; i++ {
committee = append(committee, pubKey(uint64(i)))
}
sCommittee := &ethpb.SyncCommittee{
Pubkeys: committee,
}
registerSyncSubnet(0, 0, k, sCommittee, ethpb.ValidatorStatus_ACTIVE)
coms, _, ok, exp := cache.SyncSubnetIDs.GetSyncCommitteeSubnets(k, 0)
require.Equal(t, true, ok, "No cache entry found for validator")
assert.Equal(t, uint64(1), uint64(len(coms)))
epochDuration := time.Duration(params.BeaconConfig().SlotsPerEpoch.Mul(params.BeaconConfig().SecondsPerSlot))
totalTime := time.Duration(params.BeaconConfig().EpochsPerSyncCommitteePeriod) * epochDuration * time.Second
receivedTime := time.Until(exp.Round(time.Second)).Round(time.Second)
if receivedTime < totalTime {
t.Fatalf("Expiration time of %f was less than expected duration of %f ", receivedTime.Seconds(), totalTime.Seconds())
}
}
func BenchmarkCommitteeAssignment(b *testing.B) {
genesis := util.NewBeaconBlock()