mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 13:28:01 -05:00
Aggregate and pack sync committee messages (#15608)
* Aggregate and pack sync committee messages * test * simplify error check * changelog <3 * fix assert import * remove parallelization * use sync committee cache * ignore bits already set in pool messages * fuzz fix * cleanup * test panic fix * clear cache in tests
This commit is contained in:
60
beacon-chain/cache/sync_committee.go
vendored
60
beacon-chain/cache/sync_committee.go
vendored
@@ -67,6 +67,30 @@ func (s *SyncCommitteeCache) Clear() {
|
|||||||
s.cache = cache.NewFIFO(keyFn)
|
s.cache = cache.NewFIFO(keyFn)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CurrentPeriodPositions returns current period positions of validator indices with respect with
|
||||||
|
// sync committee. If any input validator index has no assignment, an empty list will be returned
|
||||||
|
// for that validator. If the input root does not exist in cache, `ErrNonExistingSyncCommitteeKey` is returned.
|
||||||
|
// Manual checking of state for index position in state is recommended when `ErrNonExistingSyncCommitteeKey` is returned.
|
||||||
|
func (s *SyncCommitteeCache) CurrentPeriodPositions(root [32]byte, indices []primitives.ValidatorIndex) ([][]primitives.CommitteeIndex, error) {
|
||||||
|
s.lock.RLock()
|
||||||
|
defer s.lock.RUnlock()
|
||||||
|
|
||||||
|
pos, err := s.positionsInCommittee(root, indices)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
result := make([][]primitives.CommitteeIndex, len(pos))
|
||||||
|
for i, p := range pos {
|
||||||
|
if p == nil {
|
||||||
|
result[i] = []primitives.CommitteeIndex{}
|
||||||
|
} else {
|
||||||
|
result[i] = p.currentPeriod
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
// CurrentPeriodIndexPosition returns current period index position of a validator index with respect with
|
// CurrentPeriodIndexPosition returns current period index position of a validator index with respect with
|
||||||
// sync committee. If the input validator index has no assignment, an empty list will be returned.
|
// sync committee. If the input validator index has no assignment, an empty list will be returned.
|
||||||
// If the input root does not exist in cache, `ErrNonExistingSyncCommitteeKey` is returned.
|
// If the input root does not exist in cache, `ErrNonExistingSyncCommitteeKey` is returned.
|
||||||
@@ -104,11 +128,7 @@ func (s *SyncCommitteeCache) NextPeriodIndexPosition(root [32]byte, valIdx primi
|
|||||||
return pos.nextPeriod, nil
|
return pos.nextPeriod, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Helper function for `CurrentPeriodIndexPosition` and `NextPeriodIndexPosition` to return a mapping
|
func (s *SyncCommitteeCache) positionsInCommittee(root [32]byte, indices []primitives.ValidatorIndex) ([]*positionInCommittee, error) {
|
||||||
// of validator index to its index(s) position in the sync committee.
|
|
||||||
func (s *SyncCommitteeCache) idxPositionInCommittee(
|
|
||||||
root [32]byte, valIdx primitives.ValidatorIndex,
|
|
||||||
) (*positionInCommittee, error) {
|
|
||||||
obj, exists, err := s.cache.GetByKey(key(root))
|
obj, exists, err := s.cache.GetByKey(key(root))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -121,13 +141,33 @@ func (s *SyncCommitteeCache) idxPositionInCommittee(
|
|||||||
if !ok {
|
if !ok {
|
||||||
return nil, errNotSyncCommitteeIndexPosition
|
return nil, errNotSyncCommitteeIndexPosition
|
||||||
}
|
}
|
||||||
idxInCommittee, ok := item.vIndexToPositionMap[valIdx]
|
result := make([]*positionInCommittee, len(indices))
|
||||||
if !ok {
|
for i, idx := range indices {
|
||||||
SyncCommitteeCacheMiss.Inc()
|
idxInCommittee, ok := item.vIndexToPositionMap[idx]
|
||||||
|
if ok {
|
||||||
|
SyncCommitteeCacheHit.Inc()
|
||||||
|
result[i] = idxInCommittee
|
||||||
|
} else {
|
||||||
|
SyncCommitteeCacheMiss.Inc()
|
||||||
|
result[i] = nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Helper function for `CurrentPeriodIndexPosition` and `NextPeriodIndexPosition` to return a mapping
|
||||||
|
// of validator index to its index(s) position in the sync committee.
|
||||||
|
func (s *SyncCommitteeCache) idxPositionInCommittee(
|
||||||
|
root [32]byte, valIdx primitives.ValidatorIndex,
|
||||||
|
) (*positionInCommittee, error) {
|
||||||
|
positions, err := s.positionsInCommittee(root, []primitives.ValidatorIndex{valIdx})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if len(positions) == 0 {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
SyncCommitteeCacheHit.Inc()
|
return positions[0], nil
|
||||||
return idxInCommittee, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdatePositionsInCommittee updates caching of validators position in sync committee in respect to
|
// UpdatePositionsInCommittee updates caching of validators position in sync committee in respect to
|
||||||
|
|||||||
@@ -16,6 +16,11 @@ func NewSyncCommittee() *FakeSyncCommitteeCache {
|
|||||||
return &FakeSyncCommitteeCache{}
|
return &FakeSyncCommitteeCache{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CurrentPeriodPositions -- fake
|
||||||
|
func (s *FakeSyncCommitteeCache) CurrentPeriodPositions(root [32]byte, indices []primitives.ValidatorIndex) ([][]primitives.CommitteeIndex, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
// CurrentEpochIndexPosition -- fake.
|
// CurrentEpochIndexPosition -- fake.
|
||||||
func (s *FakeSyncCommitteeCache) CurrentPeriodIndexPosition(root [32]byte, valIdx primitives.ValidatorIndex) ([]primitives.CommitteeIndex, error) {
|
func (s *FakeSyncCommitteeCache) CurrentPeriodIndexPosition(root [32]byte, valIdx primitives.ValidatorIndex) ([]primitives.CommitteeIndex, error) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
|
|||||||
@@ -21,6 +21,39 @@ var (
|
|||||||
syncCommitteeCache = cache.NewSyncCommittee()
|
syncCommitteeCache = cache.NewSyncCommittee()
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// CurrentPeriodPositions returns committee indices of the current period sync committee for input validators.
|
||||||
|
func CurrentPeriodPositions(st state.BeaconState, indices []primitives.ValidatorIndex) ([][]primitives.CommitteeIndex, error) {
|
||||||
|
root, err := SyncPeriodBoundaryRoot(st)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
pos, err := syncCommitteeCache.CurrentPeriodPositions(root, indices)
|
||||||
|
if errors.Is(err, cache.ErrNonExistingSyncCommitteeKey) {
|
||||||
|
committee, err := st.CurrentSyncCommittee()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fill in the cache on miss.
|
||||||
|
go func() {
|
||||||
|
if err := syncCommitteeCache.UpdatePositionsInCommittee(root, st); err != nil {
|
||||||
|
log.WithError(err).Error("Could not fill sync committee cache on miss")
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
pos = make([][]primitives.CommitteeIndex, len(indices))
|
||||||
|
for i, idx := range indices {
|
||||||
|
pubkey := st.PubkeyAtIndex(idx)
|
||||||
|
pos[i] = findSubCommitteeIndices(pubkey[:], committee.Pubkeys)
|
||||||
|
}
|
||||||
|
return pos, nil
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return pos, nil
|
||||||
|
}
|
||||||
|
|
||||||
// IsCurrentPeriodSyncCommittee returns true if the input validator index belongs in the current period sync committee
|
// IsCurrentPeriodSyncCommittee returns true if the input validator index belongs in the current period sync committee
|
||||||
// along with the sync committee root.
|
// along with the sync committee root.
|
||||||
// 1. Checks if the public key exists in the sync committee cache
|
// 1. Checks if the public key exists in the sync committee cache
|
||||||
|
|||||||
@@ -17,6 +17,38 @@ import (
|
|||||||
"github.com/OffchainLabs/prysm/v6/testing/require"
|
"github.com/OffchainLabs/prysm/v6/testing/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func TestCurrentPeriodPositions(t *testing.T) {
|
||||||
|
helpers.ClearCache()
|
||||||
|
|
||||||
|
validators := make([]*ethpb.Validator, params.BeaconConfig().SyncCommitteeSize)
|
||||||
|
syncCommittee := ðpb.SyncCommittee{
|
||||||
|
Pubkeys: make([][]byte, params.BeaconConfig().SyncCommitteeSize),
|
||||||
|
}
|
||||||
|
for i := 0; i < len(validators); i++ {
|
||||||
|
k := make([]byte, 48)
|
||||||
|
copy(k, strconv.Itoa(i))
|
||||||
|
validators[i] = ðpb.Validator{
|
||||||
|
PublicKey: k,
|
||||||
|
}
|
||||||
|
syncCommittee.Pubkeys[i] = bytesutil.PadTo(k, 48)
|
||||||
|
}
|
||||||
|
state, err := state_native.InitializeFromProtoAltair(ðpb.BeaconStateAltair{
|
||||||
|
Validators: validators,
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, state.SetCurrentSyncCommittee(syncCommittee))
|
||||||
|
require.NoError(t, state.SetNextSyncCommittee(syncCommittee))
|
||||||
|
require.NoError(t, err, helpers.SyncCommitteeCache().UpdatePositionsInCommittee([32]byte{}, state))
|
||||||
|
|
||||||
|
positions, err := helpers.CurrentPeriodPositions(state, []primitives.ValidatorIndex{0, 1})
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 2, len(positions))
|
||||||
|
require.Equal(t, 1, len(positions[0]))
|
||||||
|
assert.Equal(t, primitives.CommitteeIndex(0), positions[0][0])
|
||||||
|
require.Equal(t, 1, len(positions[1]))
|
||||||
|
assert.Equal(t, primitives.CommitteeIndex(1), positions[1][0])
|
||||||
|
}
|
||||||
|
|
||||||
func TestIsCurrentEpochSyncCommittee_UsingCache(t *testing.T) {
|
func TestIsCurrentEpochSyncCommittee_UsingCache(t *testing.T) {
|
||||||
helpers.ClearCache()
|
helpers.ClearCache()
|
||||||
|
|
||||||
|
|||||||
@@ -1,8 +1,10 @@
|
|||||||
package validator
|
package validator
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
|
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/helpers"
|
||||||
"github.com/OffchainLabs/prysm/v6/config/params"
|
"github.com/OffchainLabs/prysm/v6/config/params"
|
||||||
"github.com/OffchainLabs/prysm/v6/consensus-types/interfaces"
|
"github.com/OffchainLabs/prysm/v6/consensus-types/interfaces"
|
||||||
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
|
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
|
||||||
@@ -15,6 +17,7 @@ import (
|
|||||||
"github.com/OffchainLabs/prysm/v6/runtime/version"
|
"github.com/OffchainLabs/prysm/v6/runtime/version"
|
||||||
"github.com/OffchainLabs/prysm/v6/time/slots"
|
"github.com/OffchainLabs/prysm/v6/time/slots"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
"github.com/prysmaticlabs/go-bitfield"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (vs *Server) setSyncAggregate(ctx context.Context, blk interfaces.SignedBeaconBlock) {
|
func (vs *Server) setSyncAggregate(ctx context.Context, blk interfaces.SignedBeaconBlock) {
|
||||||
@@ -51,21 +54,28 @@ func (vs *Server) getSyncAggregate(ctx context.Context, slot primitives.Slot, ro
|
|||||||
if vs.SyncCommitteePool == nil {
|
if vs.SyncCommitteePool == nil {
|
||||||
return nil, errors.New("sync committee pool is nil")
|
return nil, errors.New("sync committee pool is nil")
|
||||||
}
|
}
|
||||||
// Contributions have to match the input root
|
|
||||||
contributions, err := vs.SyncCommitteePool.SyncCommitteeContributions(slot)
|
poolContributions, err := vs.SyncCommitteePool.SyncCommitteeContributions(slot)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
proposerContributions := proposerSyncContributions(contributions).filterByBlockRoot(root)
|
// Contributions have to match the input root
|
||||||
|
proposerContributions := proposerSyncContributions(poolContributions).filterByBlockRoot(root)
|
||||||
|
|
||||||
// Each sync subcommittee is 128 bits and the sync committee is 512 bits for mainnet.
|
aggregatedContributions, err := vs.aggregatedSyncCommitteeMessages(ctx, slot, root, poolContributions)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "could not get aggregated sync committee messages")
|
||||||
|
}
|
||||||
|
proposerContributions = append(proposerContributions, aggregatedContributions...)
|
||||||
|
|
||||||
|
subcommitteeCount := params.BeaconConfig().SyncCommitteeSubnetCount
|
||||||
var bitsHolder [][]byte
|
var bitsHolder [][]byte
|
||||||
for i := uint64(0); i < params.BeaconConfig().SyncCommitteeSubnetCount; i++ {
|
for i := uint64(0); i < subcommitteeCount; i++ {
|
||||||
bitsHolder = append(bitsHolder, ethpb.NewSyncCommitteeAggregationBits())
|
bitsHolder = append(bitsHolder, ethpb.NewSyncCommitteeAggregationBits())
|
||||||
}
|
}
|
||||||
sigsHolder := make([]bls.Signature, 0, params.BeaconConfig().SyncCommitteeSize/params.BeaconConfig().SyncCommitteeSubnetCount)
|
sigsHolder := make([]bls.Signature, 0, params.BeaconConfig().SyncCommitteeSize/subcommitteeCount)
|
||||||
|
|
||||||
for i := uint64(0); i < params.BeaconConfig().SyncCommitteeSubnetCount; i++ {
|
for i := uint64(0); i < subcommitteeCount; i++ {
|
||||||
cs := proposerContributions.filterBySubIndex(i)
|
cs := proposerContributions.filterBySubIndex(i)
|
||||||
aggregates, err := synccontribution.Aggregate(cs)
|
aggregates, err := synccontribution.Aggregate(cs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -107,3 +117,107 @@ func (vs *Server) getSyncAggregate(ctx context.Context, slot primitives.Slot, ro
|
|||||||
SyncCommitteeSignature: syncSigBytes[:],
|
SyncCommitteeSignature: syncSigBytes[:],
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (vs *Server) aggregatedSyncCommitteeMessages(
|
||||||
|
ctx context.Context,
|
||||||
|
slot primitives.Slot,
|
||||||
|
root [32]byte,
|
||||||
|
poolContributions []*ethpb.SyncCommitteeContribution,
|
||||||
|
) ([]*ethpb.SyncCommitteeContribution, error) {
|
||||||
|
subcommitteeCount := params.BeaconConfig().SyncCommitteeSubnetCount
|
||||||
|
subcommitteeSize := params.BeaconConfig().SyncCommitteeSize / subcommitteeCount
|
||||||
|
sigsPerSubcommittee := make([][][]byte, subcommitteeCount)
|
||||||
|
bitsPerSubcommittee := make([]bitfield.Bitfield, subcommitteeCount)
|
||||||
|
for i := uint64(0); i < subcommitteeCount; i++ {
|
||||||
|
sigsPerSubcommittee[i] = make([][]byte, 0, subcommitteeSize)
|
||||||
|
bitsPerSubcommittee[i] = ethpb.NewSyncCommitteeAggregationBits()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get committee position(s) for each message's validator index.
|
||||||
|
scMessages, err := vs.SyncCommitteePool.SyncCommitteeMessages(slot)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "could not get sync committee messages")
|
||||||
|
}
|
||||||
|
messageIndices := make([]primitives.ValidatorIndex, 0, len(scMessages))
|
||||||
|
messageSigs := make([][]byte, 0, len(scMessages))
|
||||||
|
for _, msg := range scMessages {
|
||||||
|
if bytes.Equal(root[:], msg.BlockRoot) {
|
||||||
|
messageIndices = append(messageIndices, msg.ValidatorIndex)
|
||||||
|
messageSigs = append(messageSigs, msg.Signature)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
st, err := vs.HeadFetcher.HeadState(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "could not get head state")
|
||||||
|
}
|
||||||
|
positions, err := helpers.CurrentPeriodPositions(st, messageIndices)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "could not get sync committee positions")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Based on committee position(s), set the appropriate subcommittee bit and signature.
|
||||||
|
for i, ci := range positions {
|
||||||
|
for _, index := range ci {
|
||||||
|
k := uint64(index)
|
||||||
|
subnetIndex := k / subcommitteeSize
|
||||||
|
indexMod := k % subcommitteeSize
|
||||||
|
|
||||||
|
// Existing aggregated contributions from the pool intersecting with aggregates
|
||||||
|
// created from single sync committee messages can result in bit intersections
|
||||||
|
// that fail to produce the best possible final aggregate. Ignoring bits that are
|
||||||
|
// already set in pool contributions makes intersections impossible.
|
||||||
|
intersects := false
|
||||||
|
for _, poolContrib := range poolContributions {
|
||||||
|
if poolContrib.SubcommitteeIndex == subnetIndex && poolContrib.AggregationBits.BitAt(indexMod) {
|
||||||
|
intersects = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !intersects && !bitsPerSubcommittee[subnetIndex].BitAt(indexMod) {
|
||||||
|
bitsPerSubcommittee[subnetIndex].SetBitAt(indexMod, true)
|
||||||
|
sigsPerSubcommittee[subnetIndex] = append(sigsPerSubcommittee[subnetIndex], messageSigs[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Aggregate.
|
||||||
|
result := make([]*ethpb.SyncCommitteeContribution, 0, subcommitteeCount)
|
||||||
|
for i := uint64(0); i < subcommitteeCount; i++ {
|
||||||
|
aggregatedSig := make([]byte, 96)
|
||||||
|
aggregatedSig[0] = 0xC0
|
||||||
|
if len(sigsPerSubcommittee[i]) != 0 {
|
||||||
|
contrib, err := aggregateSyncSubcommitteeMessages(slot, root, i, bitsPerSubcommittee[i], sigsPerSubcommittee[i])
|
||||||
|
if err != nil {
|
||||||
|
// Skip aggregating this subcommittee
|
||||||
|
log.WithError(err).Errorf("Could not aggregate sync messages for subcommittee %d", i)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
result = append(result, contrib)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func aggregateSyncSubcommitteeMessages(
|
||||||
|
slot primitives.Slot,
|
||||||
|
root [32]byte,
|
||||||
|
subcommitteeIndex uint64,
|
||||||
|
bits bitfield.Bitfield,
|
||||||
|
sigs [][]byte,
|
||||||
|
) (*ethpb.SyncCommitteeContribution, error) {
|
||||||
|
var err error
|
||||||
|
uncompressedSigs := make([]bls.Signature, len(sigs))
|
||||||
|
for i, sig := range sigs {
|
||||||
|
uncompressedSigs[i], err = bls.SignatureFromBytesNoValidation(sig)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "could not create signature from bytes")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ðpb.SyncCommitteeContribution{
|
||||||
|
Slot: slot,
|
||||||
|
BlockRoot: root[:],
|
||||||
|
SubcommitteeIndex: subcommitteeIndex,
|
||||||
|
AggregationBits: bits.Bytes(),
|
||||||
|
Signature: bls.AggregateSignatures(uncompressedSigs).Marshal(),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|||||||
@@ -3,13 +3,67 @@ package validator
|
|||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
chainmock "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/testing"
|
||||||
|
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/helpers"
|
||||||
|
"github.com/OffchainLabs/prysm/v6/beacon-chain/operations/synccommittee"
|
||||||
|
mockSync "github.com/OffchainLabs/prysm/v6/beacon-chain/sync/initial-sync/testing"
|
||||||
"github.com/OffchainLabs/prysm/v6/config/params"
|
"github.com/OffchainLabs/prysm/v6/config/params"
|
||||||
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
|
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
|
||||||
|
"github.com/OffchainLabs/prysm/v6/crypto/bls"
|
||||||
|
"github.com/OffchainLabs/prysm/v6/encoding/bytesutil"
|
||||||
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
|
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
|
||||||
|
"github.com/OffchainLabs/prysm/v6/testing/assert"
|
||||||
"github.com/OffchainLabs/prysm/v6/testing/require"
|
"github.com/OffchainLabs/prysm/v6/testing/require"
|
||||||
"github.com/OffchainLabs/prysm/v6/testing/util"
|
"github.com/OffchainLabs/prysm/v6/testing/util"
|
||||||
|
"github.com/prysmaticlabs/go-bitfield"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func TestProposer_GetSyncAggregate_OK(t *testing.T) {
|
||||||
|
st, err := util.NewBeaconStateAltair()
|
||||||
|
require.NoError(t, err)
|
||||||
|
proposerServer := &Server{
|
||||||
|
HeadFetcher: &chainmock.ChainService{State: st},
|
||||||
|
SyncChecker: &mockSync.Sync{IsSyncing: false},
|
||||||
|
SyncCommitteePool: synccommittee.NewStore(),
|
||||||
|
}
|
||||||
|
|
||||||
|
r := params.BeaconConfig().ZeroHash
|
||||||
|
conts := []*ethpb.SyncCommitteeContribution{
|
||||||
|
{Slot: 1, SubcommitteeIndex: 0, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b0001}, BlockRoot: r[:]},
|
||||||
|
{Slot: 1, SubcommitteeIndex: 0, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b1001}, BlockRoot: r[:]},
|
||||||
|
{Slot: 1, SubcommitteeIndex: 0, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b1110}, BlockRoot: r[:]},
|
||||||
|
{Slot: 1, SubcommitteeIndex: 1, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b0001}, BlockRoot: r[:]},
|
||||||
|
{Slot: 1, SubcommitteeIndex: 1, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b1001}, BlockRoot: r[:]},
|
||||||
|
{Slot: 1, SubcommitteeIndex: 1, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b1110}, BlockRoot: r[:]},
|
||||||
|
{Slot: 1, SubcommitteeIndex: 2, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b0001}, BlockRoot: r[:]},
|
||||||
|
{Slot: 1, SubcommitteeIndex: 2, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b1001}, BlockRoot: r[:]},
|
||||||
|
{Slot: 1, SubcommitteeIndex: 2, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b1110}, BlockRoot: r[:]},
|
||||||
|
{Slot: 1, SubcommitteeIndex: 3, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b0001}, BlockRoot: r[:]},
|
||||||
|
{Slot: 1, SubcommitteeIndex: 3, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b1001}, BlockRoot: r[:]},
|
||||||
|
{Slot: 1, SubcommitteeIndex: 3, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b1110}, BlockRoot: r[:]},
|
||||||
|
{Slot: 2, SubcommitteeIndex: 0, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b10101010}, BlockRoot: r[:]},
|
||||||
|
{Slot: 2, SubcommitteeIndex: 1, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b10101010}, BlockRoot: r[:]},
|
||||||
|
{Slot: 2, SubcommitteeIndex: 2, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b10101010}, BlockRoot: r[:]},
|
||||||
|
{Slot: 2, SubcommitteeIndex: 3, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b10101010}, BlockRoot: r[:]},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, cont := range conts {
|
||||||
|
require.NoError(t, proposerServer.SyncCommitteePool.SaveSyncCommitteeContribution(cont))
|
||||||
|
}
|
||||||
|
|
||||||
|
aggregate, err := proposerServer.getSyncAggregate(t.Context(), 1, bytesutil.ToBytes32(conts[0].BlockRoot))
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.DeepEqual(t, bitfield.Bitvector32{0xf, 0xf, 0xf, 0xf}, aggregate.SyncCommitteeBits)
|
||||||
|
|
||||||
|
aggregate, err = proposerServer.getSyncAggregate(t.Context(), 2, bytesutil.ToBytes32(conts[0].BlockRoot))
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.DeepEqual(t, bitfield.Bitvector32{0xaa, 0xaa, 0xaa, 0xaa}, aggregate.SyncCommitteeBits)
|
||||||
|
|
||||||
|
aggregate, err = proposerServer.getSyncAggregate(t.Context(), 3, bytesutil.ToBytes32(conts[0].BlockRoot))
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.DeepEqual(t, bitfield.NewBitvector32(), aggregate.SyncCommitteeBits)
|
||||||
|
}
|
||||||
|
|
||||||
func TestServer_SetSyncAggregate_EmptyCase(t *testing.T) {
|
func TestServer_SetSyncAggregate_EmptyCase(t *testing.T) {
|
||||||
b, err := blocks.NewSignedBeaconBlock(util.NewBeaconBlockAltair())
|
b, err := blocks.NewSignedBeaconBlock(util.NewBeaconBlockAltair())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
@@ -25,3 +79,123 @@ func TestServer_SetSyncAggregate_EmptyCase(t *testing.T) {
|
|||||||
}
|
}
|
||||||
require.DeepEqual(t, want, agg)
|
require.DeepEqual(t, want, agg)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestProposer_GetSyncAggregate_IncludesSyncCommitteeMessages(t *testing.T) {
|
||||||
|
// TEST SETUP
|
||||||
|
// - validator 0 is selected twice in subcommittee 0 (indexes [0,1])
|
||||||
|
// - validator 1 is selected once in subcommittee 0 (index 2)
|
||||||
|
// - validator 2 is selected twice in subcommittee 1 (indexes [0,1])
|
||||||
|
// - validator 3 is selected once in subcommittee 1 (index 2)
|
||||||
|
// - sync committee aggregates in the pool have index 3 set for both subcommittees
|
||||||
|
|
||||||
|
subcommitteeSize := params.BeaconConfig().SyncCommitteeSize / params.BeaconConfig().SyncCommitteeSubnetCount
|
||||||
|
|
||||||
|
helpers.ClearCache()
|
||||||
|
st, err := util.NewBeaconStateAltair()
|
||||||
|
require.NoError(t, err)
|
||||||
|
vals := make([]*ethpb.Validator, 4)
|
||||||
|
vals[0] = ðpb.Validator{PublicKey: bytesutil.PadTo([]byte{0xf0}, 48)}
|
||||||
|
vals[1] = ðpb.Validator{PublicKey: bytesutil.PadTo([]byte{0xf1}, 48)}
|
||||||
|
vals[2] = ðpb.Validator{PublicKey: bytesutil.PadTo([]byte{0xf2}, 48)}
|
||||||
|
vals[3] = ðpb.Validator{PublicKey: bytesutil.PadTo([]byte{0xf3}, 48)}
|
||||||
|
require.NoError(t, st.SetValidators(vals))
|
||||||
|
sc := ðpb.SyncCommittee{
|
||||||
|
Pubkeys: make([][]byte, params.BeaconConfig().SyncCommitteeSize),
|
||||||
|
}
|
||||||
|
sc.Pubkeys[0] = vals[0].PublicKey
|
||||||
|
sc.Pubkeys[1] = vals[0].PublicKey
|
||||||
|
sc.Pubkeys[2] = vals[1].PublicKey
|
||||||
|
sc.Pubkeys[subcommitteeSize] = vals[2].PublicKey
|
||||||
|
sc.Pubkeys[subcommitteeSize+1] = vals[2].PublicKey
|
||||||
|
sc.Pubkeys[subcommitteeSize+2] = vals[3].PublicKey
|
||||||
|
require.NoError(t, st.SetCurrentSyncCommittee(sc))
|
||||||
|
proposerServer := &Server{
|
||||||
|
HeadFetcher: &chainmock.ChainService{State: st},
|
||||||
|
SyncChecker: &mockSync.Sync{IsSyncing: false},
|
||||||
|
SyncCommitteePool: synccommittee.NewStore(),
|
||||||
|
}
|
||||||
|
|
||||||
|
r := params.BeaconConfig().ZeroHash
|
||||||
|
msgs := []*ethpb.SyncCommitteeMessage{
|
||||||
|
{Slot: 1, BlockRoot: r[:], ValidatorIndex: 0, Signature: bls.NewAggregateSignature().Marshal()},
|
||||||
|
{Slot: 1, BlockRoot: r[:], ValidatorIndex: 1, Signature: bls.NewAggregateSignature().Marshal()},
|
||||||
|
{Slot: 1, BlockRoot: r[:], ValidatorIndex: 2, Signature: bls.NewAggregateSignature().Marshal()},
|
||||||
|
{Slot: 1, BlockRoot: r[:], ValidatorIndex: 3, Signature: bls.NewAggregateSignature().Marshal()},
|
||||||
|
}
|
||||||
|
for _, msg := range msgs {
|
||||||
|
require.NoError(t, proposerServer.SyncCommitteePool.SaveSyncCommitteeMessage(msg))
|
||||||
|
}
|
||||||
|
subcommittee0AggBits := ethpb.NewSyncCommitteeAggregationBits()
|
||||||
|
subcommittee0AggBits.SetBitAt(3, true)
|
||||||
|
subcommittee1AggBits := ethpb.NewSyncCommitteeAggregationBits()
|
||||||
|
subcommittee1AggBits.SetBitAt(3, true)
|
||||||
|
conts := []*ethpb.SyncCommitteeContribution{
|
||||||
|
{Slot: 1, SubcommitteeIndex: 0, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: subcommittee0AggBits, BlockRoot: r[:]},
|
||||||
|
{Slot: 1, SubcommitteeIndex: 1, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: subcommittee1AggBits, BlockRoot: r[:]},
|
||||||
|
}
|
||||||
|
for _, cont := range conts {
|
||||||
|
require.NoError(t, proposerServer.SyncCommitteePool.SaveSyncCommitteeContribution(cont))
|
||||||
|
}
|
||||||
|
|
||||||
|
// The final sync aggregates must have indexes [0,1,2,3] set for both subcommittees
|
||||||
|
sa, err := proposerServer.getSyncAggregate(t.Context(), 1, r)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, true, sa.SyncCommitteeBits.BitAt(0))
|
||||||
|
assert.Equal(t, true, sa.SyncCommitteeBits.BitAt(1))
|
||||||
|
assert.Equal(t, true, sa.SyncCommitteeBits.BitAt(2))
|
||||||
|
assert.Equal(t, true, sa.SyncCommitteeBits.BitAt(3))
|
||||||
|
assert.Equal(t, true, sa.SyncCommitteeBits.BitAt(subcommitteeSize))
|
||||||
|
assert.Equal(t, true, sa.SyncCommitteeBits.BitAt(subcommitteeSize+1))
|
||||||
|
assert.Equal(t, true, sa.SyncCommitteeBits.BitAt(subcommitteeSize+2))
|
||||||
|
assert.Equal(t, true, sa.SyncCommitteeBits.BitAt(subcommitteeSize+3))
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_aggregatedSyncCommitteeMessages_NoIntersectionWithPoolContributions(t *testing.T) {
|
||||||
|
helpers.ClearCache()
|
||||||
|
st, err := util.NewBeaconStateAltair()
|
||||||
|
require.NoError(t, err)
|
||||||
|
vals := make([]*ethpb.Validator, 4)
|
||||||
|
vals[0] = ðpb.Validator{PublicKey: bytesutil.PadTo([]byte{0xf0}, 48)}
|
||||||
|
vals[1] = ðpb.Validator{PublicKey: bytesutil.PadTo([]byte{0xf1}, 48)}
|
||||||
|
vals[2] = ðpb.Validator{PublicKey: bytesutil.PadTo([]byte{0xf2}, 48)}
|
||||||
|
vals[3] = ðpb.Validator{PublicKey: bytesutil.PadTo([]byte{0xf3}, 48)}
|
||||||
|
require.NoError(t, st.SetValidators(vals))
|
||||||
|
sc := ðpb.SyncCommittee{
|
||||||
|
Pubkeys: make([][]byte, params.BeaconConfig().SyncCommitteeSize),
|
||||||
|
}
|
||||||
|
sc.Pubkeys[0] = vals[0].PublicKey
|
||||||
|
sc.Pubkeys[1] = vals[1].PublicKey
|
||||||
|
sc.Pubkeys[2] = vals[2].PublicKey
|
||||||
|
sc.Pubkeys[3] = vals[3].PublicKey
|
||||||
|
require.NoError(t, st.SetCurrentSyncCommittee(sc))
|
||||||
|
proposerServer := &Server{
|
||||||
|
HeadFetcher: &chainmock.ChainService{State: st},
|
||||||
|
SyncChecker: &mockSync.Sync{IsSyncing: false},
|
||||||
|
SyncCommitteePool: synccommittee.NewStore(),
|
||||||
|
}
|
||||||
|
|
||||||
|
r := params.BeaconConfig().ZeroHash
|
||||||
|
msgs := []*ethpb.SyncCommitteeMessage{
|
||||||
|
{Slot: 1, BlockRoot: r[:], ValidatorIndex: 0, Signature: bls.NewAggregateSignature().Marshal()},
|
||||||
|
{Slot: 1, BlockRoot: r[:], ValidatorIndex: 1, Signature: bls.NewAggregateSignature().Marshal()},
|
||||||
|
{Slot: 1, BlockRoot: r[:], ValidatorIndex: 2, Signature: bls.NewAggregateSignature().Marshal()},
|
||||||
|
{Slot: 1, BlockRoot: r[:], ValidatorIndex: 3, Signature: bls.NewAggregateSignature().Marshal()},
|
||||||
|
}
|
||||||
|
for _, msg := range msgs {
|
||||||
|
require.NoError(t, proposerServer.SyncCommitteePool.SaveSyncCommitteeMessage(msg))
|
||||||
|
}
|
||||||
|
subcommitteeAggBits := ethpb.NewSyncCommitteeAggregationBits()
|
||||||
|
subcommitteeAggBits.SetBitAt(3, true)
|
||||||
|
cont := ðpb.SyncCommitteeContribution{
|
||||||
|
Slot: 1,
|
||||||
|
SubcommitteeIndex: 0,
|
||||||
|
Signature: bls.NewAggregateSignature().Marshal(),
|
||||||
|
AggregationBits: subcommitteeAggBits,
|
||||||
|
BlockRoot: r[:],
|
||||||
|
}
|
||||||
|
|
||||||
|
aggregated, err := proposerServer.aggregatedSyncCommitteeMessages(t.Context(), 1, r, []*ethpb.SyncCommitteeContribution{cont})
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 1, len(aggregated))
|
||||||
|
assert.Equal(t, false, aggregated[0].AggregationBits.BitAt(3))
|
||||||
|
}
|
||||||
|
|||||||
@@ -3096,49 +3096,6 @@ func TestProposer_DeleteAttsInPool_Aggregated(t *testing.T) {
|
|||||||
assert.Equal(t, 0, len(atts), "Did not delete unaggregated attestation")
|
assert.Equal(t, 0, len(atts), "Did not delete unaggregated attestation")
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestProposer_GetSyncAggregate_OK(t *testing.T) {
|
|
||||||
proposerServer := &Server{
|
|
||||||
SyncChecker: &mockSync.Sync{IsSyncing: false},
|
|
||||||
SyncCommitteePool: synccommittee.NewStore(),
|
|
||||||
}
|
|
||||||
|
|
||||||
r := params.BeaconConfig().ZeroHash
|
|
||||||
conts := []*ethpb.SyncCommitteeContribution{
|
|
||||||
{Slot: 1, SubcommitteeIndex: 0, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b0001}, BlockRoot: r[:]},
|
|
||||||
{Slot: 1, SubcommitteeIndex: 0, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b1001}, BlockRoot: r[:]},
|
|
||||||
{Slot: 1, SubcommitteeIndex: 0, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b1110}, BlockRoot: r[:]},
|
|
||||||
{Slot: 1, SubcommitteeIndex: 1, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b0001}, BlockRoot: r[:]},
|
|
||||||
{Slot: 1, SubcommitteeIndex: 1, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b1001}, BlockRoot: r[:]},
|
|
||||||
{Slot: 1, SubcommitteeIndex: 1, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b1110}, BlockRoot: r[:]},
|
|
||||||
{Slot: 1, SubcommitteeIndex: 2, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b0001}, BlockRoot: r[:]},
|
|
||||||
{Slot: 1, SubcommitteeIndex: 2, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b1001}, BlockRoot: r[:]},
|
|
||||||
{Slot: 1, SubcommitteeIndex: 2, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b1110}, BlockRoot: r[:]},
|
|
||||||
{Slot: 1, SubcommitteeIndex: 3, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b0001}, BlockRoot: r[:]},
|
|
||||||
{Slot: 1, SubcommitteeIndex: 3, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b1001}, BlockRoot: r[:]},
|
|
||||||
{Slot: 1, SubcommitteeIndex: 3, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b1110}, BlockRoot: r[:]},
|
|
||||||
{Slot: 2, SubcommitteeIndex: 0, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b10101010}, BlockRoot: r[:]},
|
|
||||||
{Slot: 2, SubcommitteeIndex: 1, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b10101010}, BlockRoot: r[:]},
|
|
||||||
{Slot: 2, SubcommitteeIndex: 2, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b10101010}, BlockRoot: r[:]},
|
|
||||||
{Slot: 2, SubcommitteeIndex: 3, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b10101010}, BlockRoot: r[:]},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, cont := range conts {
|
|
||||||
require.NoError(t, proposerServer.SyncCommitteePool.SaveSyncCommitteeContribution(cont))
|
|
||||||
}
|
|
||||||
|
|
||||||
aggregate, err := proposerServer.getSyncAggregate(t.Context(), 1, bytesutil.ToBytes32(conts[0].BlockRoot))
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.DeepEqual(t, bitfield.Bitvector32{0xf, 0xf, 0xf, 0xf}, aggregate.SyncCommitteeBits)
|
|
||||||
|
|
||||||
aggregate, err = proposerServer.getSyncAggregate(t.Context(), 2, bytesutil.ToBytes32(conts[0].BlockRoot))
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.DeepEqual(t, bitfield.Bitvector32{0xaa, 0xaa, 0xaa, 0xaa}, aggregate.SyncCommitteeBits)
|
|
||||||
|
|
||||||
aggregate, err = proposerServer.getSyncAggregate(t.Context(), 3, bytesutil.ToBytes32(conts[0].BlockRoot))
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.DeepEqual(t, bitfield.NewBitvector32(), aggregate.SyncCommitteeBits)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestProposer_PrepareBeaconProposer(t *testing.T) {
|
func TestProposer_PrepareBeaconProposer(t *testing.T) {
|
||||||
type args struct {
|
type args struct {
|
||||||
request *ethpb.PrepareBeaconProposerRequest
|
request *ethpb.PrepareBeaconProposerRequest
|
||||||
|
|||||||
3
changelog/radek_agg-sc-messages.md
Normal file
3
changelog/radek_agg-sc-messages.md
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
### Added
|
||||||
|
|
||||||
|
- Aggregate and pack sync committee messages into blocks.
|
||||||
Reference in New Issue
Block a user