From f035da6fc588d2cf0c1a18cf9e39f7b56c8f7f9a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Kapka?= Date: Mon, 8 Sep 2025 17:17:19 +0200 Subject: [PATCH] Aggregate and pack sync committee messages (#15608) * Aggregate and pack sync committee messages * test * simplify error check * changelog <3 * fix assert import * remove parallelization * use sync committee cache * ignore bits already set in pool messages * fuzz fix * cleanup * test panic fix * clear cache in tests --- beacon-chain/cache/sync_committee.go | 60 +++++- beacon-chain/cache/sync_committee_disabled.go | 5 + beacon-chain/core/helpers/sync_committee.go | 33 ++++ .../core/helpers/sync_committee_test.go | 32 ++++ .../v1alpha1/validator/proposer_altair.go | 128 ++++++++++++- .../validator/proposer_altair_test.go | 174 ++++++++++++++++++ .../prysm/v1alpha1/validator/proposer_test.go | 43 ----- changelog/radek_agg-sc-messages.md | 3 + 8 files changed, 418 insertions(+), 60 deletions(-) create mode 100644 changelog/radek_agg-sc-messages.md diff --git a/beacon-chain/cache/sync_committee.go b/beacon-chain/cache/sync_committee.go index 89983c38fa..ea95552fd3 100644 --- a/beacon-chain/cache/sync_committee.go +++ b/beacon-chain/cache/sync_committee.go @@ -67,6 +67,30 @@ func (s *SyncCommitteeCache) Clear() { s.cache = cache.NewFIFO(keyFn) } +// CurrentPeriodPositions returns current period positions of validator indices with respect with +// sync committee. If any input validator index has no assignment, an empty list will be returned +// for that validator. If the input root does not exist in cache, `ErrNonExistingSyncCommitteeKey` is returned. +// Manual checking of state for index position in state is recommended when `ErrNonExistingSyncCommitteeKey` is returned. +func (s *SyncCommitteeCache) CurrentPeriodPositions(root [32]byte, indices []primitives.ValidatorIndex) ([][]primitives.CommitteeIndex, error) { + s.lock.RLock() + defer s.lock.RUnlock() + + pos, err := s.positionsInCommittee(root, indices) + if err != nil { + return nil, err + } + result := make([][]primitives.CommitteeIndex, len(pos)) + for i, p := range pos { + if p == nil { + result[i] = []primitives.CommitteeIndex{} + } else { + result[i] = p.currentPeriod + } + } + + return result, nil +} + // CurrentPeriodIndexPosition returns current period index position of a validator index with respect with // sync committee. If the input validator index has no assignment, an empty list will be returned. // If the input root does not exist in cache, `ErrNonExistingSyncCommitteeKey` is returned. @@ -104,11 +128,7 @@ func (s *SyncCommitteeCache) NextPeriodIndexPosition(root [32]byte, valIdx primi return pos.nextPeriod, nil } -// Helper function for `CurrentPeriodIndexPosition` and `NextPeriodIndexPosition` to return a mapping -// of validator index to its index(s) position in the sync committee. -func (s *SyncCommitteeCache) idxPositionInCommittee( - root [32]byte, valIdx primitives.ValidatorIndex, -) (*positionInCommittee, error) { +func (s *SyncCommitteeCache) positionsInCommittee(root [32]byte, indices []primitives.ValidatorIndex) ([]*positionInCommittee, error) { obj, exists, err := s.cache.GetByKey(key(root)) if err != nil { return nil, err @@ -121,13 +141,33 @@ func (s *SyncCommitteeCache) idxPositionInCommittee( if !ok { return nil, errNotSyncCommitteeIndexPosition } - idxInCommittee, ok := item.vIndexToPositionMap[valIdx] - if !ok { - SyncCommitteeCacheMiss.Inc() + result := make([]*positionInCommittee, len(indices)) + for i, idx := range indices { + idxInCommittee, ok := item.vIndexToPositionMap[idx] + if ok { + SyncCommitteeCacheHit.Inc() + result[i] = idxInCommittee + } else { + SyncCommitteeCacheMiss.Inc() + result[i] = nil + } + } + return result, nil +} + +// Helper function for `CurrentPeriodIndexPosition` and `NextPeriodIndexPosition` to return a mapping +// of validator index to its index(s) position in the sync committee. +func (s *SyncCommitteeCache) idxPositionInCommittee( + root [32]byte, valIdx primitives.ValidatorIndex, +) (*positionInCommittee, error) { + positions, err := s.positionsInCommittee(root, []primitives.ValidatorIndex{valIdx}) + if err != nil { + return nil, err + } + if len(positions) == 0 { return nil, nil } - SyncCommitteeCacheHit.Inc() - return idxInCommittee, nil + return positions[0], nil } // UpdatePositionsInCommittee updates caching of validators position in sync committee in respect to diff --git a/beacon-chain/cache/sync_committee_disabled.go b/beacon-chain/cache/sync_committee_disabled.go index ca0581df23..06309543ab 100644 --- a/beacon-chain/cache/sync_committee_disabled.go +++ b/beacon-chain/cache/sync_committee_disabled.go @@ -16,6 +16,11 @@ func NewSyncCommittee() *FakeSyncCommitteeCache { return &FakeSyncCommitteeCache{} } +// CurrentPeriodPositions -- fake +func (s *FakeSyncCommitteeCache) CurrentPeriodPositions(root [32]byte, indices []primitives.ValidatorIndex) ([][]primitives.CommitteeIndex, error) { + return nil, nil +} + // CurrentEpochIndexPosition -- fake. func (s *FakeSyncCommitteeCache) CurrentPeriodIndexPosition(root [32]byte, valIdx primitives.ValidatorIndex) ([]primitives.CommitteeIndex, error) { return nil, nil diff --git a/beacon-chain/core/helpers/sync_committee.go b/beacon-chain/core/helpers/sync_committee.go index 17f23e6dc0..961aafbf5b 100644 --- a/beacon-chain/core/helpers/sync_committee.go +++ b/beacon-chain/core/helpers/sync_committee.go @@ -21,6 +21,39 @@ var ( syncCommitteeCache = cache.NewSyncCommittee() ) +// CurrentPeriodPositions returns committee indices of the current period sync committee for input validators. +func CurrentPeriodPositions(st state.BeaconState, indices []primitives.ValidatorIndex) ([][]primitives.CommitteeIndex, error) { + root, err := SyncPeriodBoundaryRoot(st) + if err != nil { + return nil, err + } + pos, err := syncCommitteeCache.CurrentPeriodPositions(root, indices) + if errors.Is(err, cache.ErrNonExistingSyncCommitteeKey) { + committee, err := st.CurrentSyncCommittee() + if err != nil { + return nil, err + } + + // Fill in the cache on miss. + go func() { + if err := syncCommitteeCache.UpdatePositionsInCommittee(root, st); err != nil { + log.WithError(err).Error("Could not fill sync committee cache on miss") + } + }() + + pos = make([][]primitives.CommitteeIndex, len(indices)) + for i, idx := range indices { + pubkey := st.PubkeyAtIndex(idx) + pos[i] = findSubCommitteeIndices(pubkey[:], committee.Pubkeys) + } + return pos, nil + } + if err != nil { + return nil, err + } + return pos, nil +} + // IsCurrentPeriodSyncCommittee returns true if the input validator index belongs in the current period sync committee // along with the sync committee root. // 1. Checks if the public key exists in the sync committee cache diff --git a/beacon-chain/core/helpers/sync_committee_test.go b/beacon-chain/core/helpers/sync_committee_test.go index e80317751c..cdc11c2f95 100644 --- a/beacon-chain/core/helpers/sync_committee_test.go +++ b/beacon-chain/core/helpers/sync_committee_test.go @@ -17,6 +17,38 @@ import ( "github.com/OffchainLabs/prysm/v6/testing/require" ) +func TestCurrentPeriodPositions(t *testing.T) { + helpers.ClearCache() + + validators := make([]*ethpb.Validator, params.BeaconConfig().SyncCommitteeSize) + syncCommittee := ðpb.SyncCommittee{ + Pubkeys: make([][]byte, params.BeaconConfig().SyncCommitteeSize), + } + for i := 0; i < len(validators); i++ { + k := make([]byte, 48) + copy(k, strconv.Itoa(i)) + validators[i] = ðpb.Validator{ + PublicKey: k, + } + syncCommittee.Pubkeys[i] = bytesutil.PadTo(k, 48) + } + state, err := state_native.InitializeFromProtoAltair(ðpb.BeaconStateAltair{ + Validators: validators, + }) + require.NoError(t, err) + require.NoError(t, state.SetCurrentSyncCommittee(syncCommittee)) + require.NoError(t, state.SetNextSyncCommittee(syncCommittee)) + require.NoError(t, err, helpers.SyncCommitteeCache().UpdatePositionsInCommittee([32]byte{}, state)) + + positions, err := helpers.CurrentPeriodPositions(state, []primitives.ValidatorIndex{0, 1}) + require.NoError(t, err) + require.Equal(t, 2, len(positions)) + require.Equal(t, 1, len(positions[0])) + assert.Equal(t, primitives.CommitteeIndex(0), positions[0][0]) + require.Equal(t, 1, len(positions[1])) + assert.Equal(t, primitives.CommitteeIndex(1), positions[1][0]) +} + func TestIsCurrentEpochSyncCommittee_UsingCache(t *testing.T) { helpers.ClearCache() diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_altair.go b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_altair.go index ccb1eac556..ad6c50ba19 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_altair.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_altair.go @@ -1,8 +1,10 @@ package validator import ( + "bytes" "context" + "github.com/OffchainLabs/prysm/v6/beacon-chain/core/helpers" "github.com/OffchainLabs/prysm/v6/config/params" "github.com/OffchainLabs/prysm/v6/consensus-types/interfaces" "github.com/OffchainLabs/prysm/v6/consensus-types/primitives" @@ -15,6 +17,7 @@ import ( "github.com/OffchainLabs/prysm/v6/runtime/version" "github.com/OffchainLabs/prysm/v6/time/slots" "github.com/pkg/errors" + "github.com/prysmaticlabs/go-bitfield" ) func (vs *Server) setSyncAggregate(ctx context.Context, blk interfaces.SignedBeaconBlock) { @@ -51,21 +54,28 @@ func (vs *Server) getSyncAggregate(ctx context.Context, slot primitives.Slot, ro if vs.SyncCommitteePool == nil { return nil, errors.New("sync committee pool is nil") } - // Contributions have to match the input root - contributions, err := vs.SyncCommitteePool.SyncCommitteeContributions(slot) + + poolContributions, err := vs.SyncCommitteePool.SyncCommitteeContributions(slot) if err != nil { return nil, err } - proposerContributions := proposerSyncContributions(contributions).filterByBlockRoot(root) + // Contributions have to match the input root + proposerContributions := proposerSyncContributions(poolContributions).filterByBlockRoot(root) - // Each sync subcommittee is 128 bits and the sync committee is 512 bits for mainnet. + aggregatedContributions, err := vs.aggregatedSyncCommitteeMessages(ctx, slot, root, poolContributions) + if err != nil { + return nil, errors.Wrap(err, "could not get aggregated sync committee messages") + } + proposerContributions = append(proposerContributions, aggregatedContributions...) + + subcommitteeCount := params.BeaconConfig().SyncCommitteeSubnetCount var bitsHolder [][]byte - for i := uint64(0); i < params.BeaconConfig().SyncCommitteeSubnetCount; i++ { + for i := uint64(0); i < subcommitteeCount; i++ { bitsHolder = append(bitsHolder, ethpb.NewSyncCommitteeAggregationBits()) } - sigsHolder := make([]bls.Signature, 0, params.BeaconConfig().SyncCommitteeSize/params.BeaconConfig().SyncCommitteeSubnetCount) + sigsHolder := make([]bls.Signature, 0, params.BeaconConfig().SyncCommitteeSize/subcommitteeCount) - for i := uint64(0); i < params.BeaconConfig().SyncCommitteeSubnetCount; i++ { + for i := uint64(0); i < subcommitteeCount; i++ { cs := proposerContributions.filterBySubIndex(i) aggregates, err := synccontribution.Aggregate(cs) if err != nil { @@ -107,3 +117,107 @@ func (vs *Server) getSyncAggregate(ctx context.Context, slot primitives.Slot, ro SyncCommitteeSignature: syncSigBytes[:], }, nil } + +func (vs *Server) aggregatedSyncCommitteeMessages( + ctx context.Context, + slot primitives.Slot, + root [32]byte, + poolContributions []*ethpb.SyncCommitteeContribution, +) ([]*ethpb.SyncCommitteeContribution, error) { + subcommitteeCount := params.BeaconConfig().SyncCommitteeSubnetCount + subcommitteeSize := params.BeaconConfig().SyncCommitteeSize / subcommitteeCount + sigsPerSubcommittee := make([][][]byte, subcommitteeCount) + bitsPerSubcommittee := make([]bitfield.Bitfield, subcommitteeCount) + for i := uint64(0); i < subcommitteeCount; i++ { + sigsPerSubcommittee[i] = make([][]byte, 0, subcommitteeSize) + bitsPerSubcommittee[i] = ethpb.NewSyncCommitteeAggregationBits() + } + + // Get committee position(s) for each message's validator index. + scMessages, err := vs.SyncCommitteePool.SyncCommitteeMessages(slot) + if err != nil { + return nil, errors.Wrap(err, "could not get sync committee messages") + } + messageIndices := make([]primitives.ValidatorIndex, 0, len(scMessages)) + messageSigs := make([][]byte, 0, len(scMessages)) + for _, msg := range scMessages { + if bytes.Equal(root[:], msg.BlockRoot) { + messageIndices = append(messageIndices, msg.ValidatorIndex) + messageSigs = append(messageSigs, msg.Signature) + } + } + st, err := vs.HeadFetcher.HeadState(ctx) + if err != nil { + return nil, errors.Wrap(err, "could not get head state") + } + positions, err := helpers.CurrentPeriodPositions(st, messageIndices) + if err != nil { + return nil, errors.Wrap(err, "could not get sync committee positions") + } + + // Based on committee position(s), set the appropriate subcommittee bit and signature. + for i, ci := range positions { + for _, index := range ci { + k := uint64(index) + subnetIndex := k / subcommitteeSize + indexMod := k % subcommitteeSize + + // Existing aggregated contributions from the pool intersecting with aggregates + // created from single sync committee messages can result in bit intersections + // that fail to produce the best possible final aggregate. Ignoring bits that are + // already set in pool contributions makes intersections impossible. + intersects := false + for _, poolContrib := range poolContributions { + if poolContrib.SubcommitteeIndex == subnetIndex && poolContrib.AggregationBits.BitAt(indexMod) { + intersects = true + } + } + if !intersects && !bitsPerSubcommittee[subnetIndex].BitAt(indexMod) { + bitsPerSubcommittee[subnetIndex].SetBitAt(indexMod, true) + sigsPerSubcommittee[subnetIndex] = append(sigsPerSubcommittee[subnetIndex], messageSigs[i]) + } + } + } + + // Aggregate. + result := make([]*ethpb.SyncCommitteeContribution, 0, subcommitteeCount) + for i := uint64(0); i < subcommitteeCount; i++ { + aggregatedSig := make([]byte, 96) + aggregatedSig[0] = 0xC0 + if len(sigsPerSubcommittee[i]) != 0 { + contrib, err := aggregateSyncSubcommitteeMessages(slot, root, i, bitsPerSubcommittee[i], sigsPerSubcommittee[i]) + if err != nil { + // Skip aggregating this subcommittee + log.WithError(err).Errorf("Could not aggregate sync messages for subcommittee %d", i) + continue + } + result = append(result, contrib) + } + } + + return result, nil +} + +func aggregateSyncSubcommitteeMessages( + slot primitives.Slot, + root [32]byte, + subcommitteeIndex uint64, + bits bitfield.Bitfield, + sigs [][]byte, +) (*ethpb.SyncCommitteeContribution, error) { + var err error + uncompressedSigs := make([]bls.Signature, len(sigs)) + for i, sig := range sigs { + uncompressedSigs[i], err = bls.SignatureFromBytesNoValidation(sig) + if err != nil { + return nil, errors.Wrap(err, "could not create signature from bytes") + } + } + return ðpb.SyncCommitteeContribution{ + Slot: slot, + BlockRoot: root[:], + SubcommitteeIndex: subcommitteeIndex, + AggregationBits: bits.Bytes(), + Signature: bls.AggregateSignatures(uncompressedSigs).Marshal(), + }, nil +} diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_altair_test.go b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_altair_test.go index af3ed84549..63123eb70e 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_altair_test.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_altair_test.go @@ -3,13 +3,67 @@ package validator import ( "testing" + chainmock "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/testing" + "github.com/OffchainLabs/prysm/v6/beacon-chain/core/helpers" + "github.com/OffchainLabs/prysm/v6/beacon-chain/operations/synccommittee" + mockSync "github.com/OffchainLabs/prysm/v6/beacon-chain/sync/initial-sync/testing" "github.com/OffchainLabs/prysm/v6/config/params" "github.com/OffchainLabs/prysm/v6/consensus-types/blocks" + "github.com/OffchainLabs/prysm/v6/crypto/bls" + "github.com/OffchainLabs/prysm/v6/encoding/bytesutil" ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1" + "github.com/OffchainLabs/prysm/v6/testing/assert" "github.com/OffchainLabs/prysm/v6/testing/require" "github.com/OffchainLabs/prysm/v6/testing/util" + "github.com/prysmaticlabs/go-bitfield" ) +func TestProposer_GetSyncAggregate_OK(t *testing.T) { + st, err := util.NewBeaconStateAltair() + require.NoError(t, err) + proposerServer := &Server{ + HeadFetcher: &chainmock.ChainService{State: st}, + SyncChecker: &mockSync.Sync{IsSyncing: false}, + SyncCommitteePool: synccommittee.NewStore(), + } + + r := params.BeaconConfig().ZeroHash + conts := []*ethpb.SyncCommitteeContribution{ + {Slot: 1, SubcommitteeIndex: 0, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b0001}, BlockRoot: r[:]}, + {Slot: 1, SubcommitteeIndex: 0, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b1001}, BlockRoot: r[:]}, + {Slot: 1, SubcommitteeIndex: 0, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b1110}, BlockRoot: r[:]}, + {Slot: 1, SubcommitteeIndex: 1, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b0001}, BlockRoot: r[:]}, + {Slot: 1, SubcommitteeIndex: 1, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b1001}, BlockRoot: r[:]}, + {Slot: 1, SubcommitteeIndex: 1, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b1110}, BlockRoot: r[:]}, + {Slot: 1, SubcommitteeIndex: 2, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b0001}, BlockRoot: r[:]}, + {Slot: 1, SubcommitteeIndex: 2, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b1001}, BlockRoot: r[:]}, + {Slot: 1, SubcommitteeIndex: 2, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b1110}, BlockRoot: r[:]}, + {Slot: 1, SubcommitteeIndex: 3, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b0001}, BlockRoot: r[:]}, + {Slot: 1, SubcommitteeIndex: 3, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b1001}, BlockRoot: r[:]}, + {Slot: 1, SubcommitteeIndex: 3, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b1110}, BlockRoot: r[:]}, + {Slot: 2, SubcommitteeIndex: 0, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b10101010}, BlockRoot: r[:]}, + {Slot: 2, SubcommitteeIndex: 1, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b10101010}, BlockRoot: r[:]}, + {Slot: 2, SubcommitteeIndex: 2, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b10101010}, BlockRoot: r[:]}, + {Slot: 2, SubcommitteeIndex: 3, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b10101010}, BlockRoot: r[:]}, + } + + for _, cont := range conts { + require.NoError(t, proposerServer.SyncCommitteePool.SaveSyncCommitteeContribution(cont)) + } + + aggregate, err := proposerServer.getSyncAggregate(t.Context(), 1, bytesutil.ToBytes32(conts[0].BlockRoot)) + require.NoError(t, err) + require.DeepEqual(t, bitfield.Bitvector32{0xf, 0xf, 0xf, 0xf}, aggregate.SyncCommitteeBits) + + aggregate, err = proposerServer.getSyncAggregate(t.Context(), 2, bytesutil.ToBytes32(conts[0].BlockRoot)) + require.NoError(t, err) + require.DeepEqual(t, bitfield.Bitvector32{0xaa, 0xaa, 0xaa, 0xaa}, aggregate.SyncCommitteeBits) + + aggregate, err = proposerServer.getSyncAggregate(t.Context(), 3, bytesutil.ToBytes32(conts[0].BlockRoot)) + require.NoError(t, err) + require.DeepEqual(t, bitfield.NewBitvector32(), aggregate.SyncCommitteeBits) +} + func TestServer_SetSyncAggregate_EmptyCase(t *testing.T) { b, err := blocks.NewSignedBeaconBlock(util.NewBeaconBlockAltair()) require.NoError(t, err) @@ -25,3 +79,123 @@ func TestServer_SetSyncAggregate_EmptyCase(t *testing.T) { } require.DeepEqual(t, want, agg) } + +func TestProposer_GetSyncAggregate_IncludesSyncCommitteeMessages(t *testing.T) { + // TEST SETUP + // - validator 0 is selected twice in subcommittee 0 (indexes [0,1]) + // - validator 1 is selected once in subcommittee 0 (index 2) + // - validator 2 is selected twice in subcommittee 1 (indexes [0,1]) + // - validator 3 is selected once in subcommittee 1 (index 2) + // - sync committee aggregates in the pool have index 3 set for both subcommittees + + subcommitteeSize := params.BeaconConfig().SyncCommitteeSize / params.BeaconConfig().SyncCommitteeSubnetCount + + helpers.ClearCache() + st, err := util.NewBeaconStateAltair() + require.NoError(t, err) + vals := make([]*ethpb.Validator, 4) + vals[0] = ðpb.Validator{PublicKey: bytesutil.PadTo([]byte{0xf0}, 48)} + vals[1] = ðpb.Validator{PublicKey: bytesutil.PadTo([]byte{0xf1}, 48)} + vals[2] = ðpb.Validator{PublicKey: bytesutil.PadTo([]byte{0xf2}, 48)} + vals[3] = ðpb.Validator{PublicKey: bytesutil.PadTo([]byte{0xf3}, 48)} + require.NoError(t, st.SetValidators(vals)) + sc := ðpb.SyncCommittee{ + Pubkeys: make([][]byte, params.BeaconConfig().SyncCommitteeSize), + } + sc.Pubkeys[0] = vals[0].PublicKey + sc.Pubkeys[1] = vals[0].PublicKey + sc.Pubkeys[2] = vals[1].PublicKey + sc.Pubkeys[subcommitteeSize] = vals[2].PublicKey + sc.Pubkeys[subcommitteeSize+1] = vals[2].PublicKey + sc.Pubkeys[subcommitteeSize+2] = vals[3].PublicKey + require.NoError(t, st.SetCurrentSyncCommittee(sc)) + proposerServer := &Server{ + HeadFetcher: &chainmock.ChainService{State: st}, + SyncChecker: &mockSync.Sync{IsSyncing: false}, + SyncCommitteePool: synccommittee.NewStore(), + } + + r := params.BeaconConfig().ZeroHash + msgs := []*ethpb.SyncCommitteeMessage{ + {Slot: 1, BlockRoot: r[:], ValidatorIndex: 0, Signature: bls.NewAggregateSignature().Marshal()}, + {Slot: 1, BlockRoot: r[:], ValidatorIndex: 1, Signature: bls.NewAggregateSignature().Marshal()}, + {Slot: 1, BlockRoot: r[:], ValidatorIndex: 2, Signature: bls.NewAggregateSignature().Marshal()}, + {Slot: 1, BlockRoot: r[:], ValidatorIndex: 3, Signature: bls.NewAggregateSignature().Marshal()}, + } + for _, msg := range msgs { + require.NoError(t, proposerServer.SyncCommitteePool.SaveSyncCommitteeMessage(msg)) + } + subcommittee0AggBits := ethpb.NewSyncCommitteeAggregationBits() + subcommittee0AggBits.SetBitAt(3, true) + subcommittee1AggBits := ethpb.NewSyncCommitteeAggregationBits() + subcommittee1AggBits.SetBitAt(3, true) + conts := []*ethpb.SyncCommitteeContribution{ + {Slot: 1, SubcommitteeIndex: 0, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: subcommittee0AggBits, BlockRoot: r[:]}, + {Slot: 1, SubcommitteeIndex: 1, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: subcommittee1AggBits, BlockRoot: r[:]}, + } + for _, cont := range conts { + require.NoError(t, proposerServer.SyncCommitteePool.SaveSyncCommitteeContribution(cont)) + } + + // The final sync aggregates must have indexes [0,1,2,3] set for both subcommittees + sa, err := proposerServer.getSyncAggregate(t.Context(), 1, r) + require.NoError(t, err) + assert.Equal(t, true, sa.SyncCommitteeBits.BitAt(0)) + assert.Equal(t, true, sa.SyncCommitteeBits.BitAt(1)) + assert.Equal(t, true, sa.SyncCommitteeBits.BitAt(2)) + assert.Equal(t, true, sa.SyncCommitteeBits.BitAt(3)) + assert.Equal(t, true, sa.SyncCommitteeBits.BitAt(subcommitteeSize)) + assert.Equal(t, true, sa.SyncCommitteeBits.BitAt(subcommitteeSize+1)) + assert.Equal(t, true, sa.SyncCommitteeBits.BitAt(subcommitteeSize+2)) + assert.Equal(t, true, sa.SyncCommitteeBits.BitAt(subcommitteeSize+3)) +} + +func Test_aggregatedSyncCommitteeMessages_NoIntersectionWithPoolContributions(t *testing.T) { + helpers.ClearCache() + st, err := util.NewBeaconStateAltair() + require.NoError(t, err) + vals := make([]*ethpb.Validator, 4) + vals[0] = ðpb.Validator{PublicKey: bytesutil.PadTo([]byte{0xf0}, 48)} + vals[1] = ðpb.Validator{PublicKey: bytesutil.PadTo([]byte{0xf1}, 48)} + vals[2] = ðpb.Validator{PublicKey: bytesutil.PadTo([]byte{0xf2}, 48)} + vals[3] = ðpb.Validator{PublicKey: bytesutil.PadTo([]byte{0xf3}, 48)} + require.NoError(t, st.SetValidators(vals)) + sc := ðpb.SyncCommittee{ + Pubkeys: make([][]byte, params.BeaconConfig().SyncCommitteeSize), + } + sc.Pubkeys[0] = vals[0].PublicKey + sc.Pubkeys[1] = vals[1].PublicKey + sc.Pubkeys[2] = vals[2].PublicKey + sc.Pubkeys[3] = vals[3].PublicKey + require.NoError(t, st.SetCurrentSyncCommittee(sc)) + proposerServer := &Server{ + HeadFetcher: &chainmock.ChainService{State: st}, + SyncChecker: &mockSync.Sync{IsSyncing: false}, + SyncCommitteePool: synccommittee.NewStore(), + } + + r := params.BeaconConfig().ZeroHash + msgs := []*ethpb.SyncCommitteeMessage{ + {Slot: 1, BlockRoot: r[:], ValidatorIndex: 0, Signature: bls.NewAggregateSignature().Marshal()}, + {Slot: 1, BlockRoot: r[:], ValidatorIndex: 1, Signature: bls.NewAggregateSignature().Marshal()}, + {Slot: 1, BlockRoot: r[:], ValidatorIndex: 2, Signature: bls.NewAggregateSignature().Marshal()}, + {Slot: 1, BlockRoot: r[:], ValidatorIndex: 3, Signature: bls.NewAggregateSignature().Marshal()}, + } + for _, msg := range msgs { + require.NoError(t, proposerServer.SyncCommitteePool.SaveSyncCommitteeMessage(msg)) + } + subcommitteeAggBits := ethpb.NewSyncCommitteeAggregationBits() + subcommitteeAggBits.SetBitAt(3, true) + cont := ðpb.SyncCommitteeContribution{ + Slot: 1, + SubcommitteeIndex: 0, + Signature: bls.NewAggregateSignature().Marshal(), + AggregationBits: subcommitteeAggBits, + BlockRoot: r[:], + } + + aggregated, err := proposerServer.aggregatedSyncCommitteeMessages(t.Context(), 1, r, []*ethpb.SyncCommitteeContribution{cont}) + require.NoError(t, err) + require.Equal(t, 1, len(aggregated)) + assert.Equal(t, false, aggregated[0].AggregationBits.BitAt(3)) +} diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_test.go b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_test.go index faa2212034..20c7fe5f11 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_test.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_test.go @@ -3096,49 +3096,6 @@ func TestProposer_DeleteAttsInPool_Aggregated(t *testing.T) { assert.Equal(t, 0, len(atts), "Did not delete unaggregated attestation") } -func TestProposer_GetSyncAggregate_OK(t *testing.T) { - proposerServer := &Server{ - SyncChecker: &mockSync.Sync{IsSyncing: false}, - SyncCommitteePool: synccommittee.NewStore(), - } - - r := params.BeaconConfig().ZeroHash - conts := []*ethpb.SyncCommitteeContribution{ - {Slot: 1, SubcommitteeIndex: 0, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b0001}, BlockRoot: r[:]}, - {Slot: 1, SubcommitteeIndex: 0, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b1001}, BlockRoot: r[:]}, - {Slot: 1, SubcommitteeIndex: 0, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b1110}, BlockRoot: r[:]}, - {Slot: 1, SubcommitteeIndex: 1, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b0001}, BlockRoot: r[:]}, - {Slot: 1, SubcommitteeIndex: 1, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b1001}, BlockRoot: r[:]}, - {Slot: 1, SubcommitteeIndex: 1, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b1110}, BlockRoot: r[:]}, - {Slot: 1, SubcommitteeIndex: 2, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b0001}, BlockRoot: r[:]}, - {Slot: 1, SubcommitteeIndex: 2, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b1001}, BlockRoot: r[:]}, - {Slot: 1, SubcommitteeIndex: 2, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b1110}, BlockRoot: r[:]}, - {Slot: 1, SubcommitteeIndex: 3, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b0001}, BlockRoot: r[:]}, - {Slot: 1, SubcommitteeIndex: 3, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b1001}, BlockRoot: r[:]}, - {Slot: 1, SubcommitteeIndex: 3, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b1110}, BlockRoot: r[:]}, - {Slot: 2, SubcommitteeIndex: 0, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b10101010}, BlockRoot: r[:]}, - {Slot: 2, SubcommitteeIndex: 1, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b10101010}, BlockRoot: r[:]}, - {Slot: 2, SubcommitteeIndex: 2, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b10101010}, BlockRoot: r[:]}, - {Slot: 2, SubcommitteeIndex: 3, Signature: bls.NewAggregateSignature().Marshal(), AggregationBits: []byte{0b10101010}, BlockRoot: r[:]}, - } - - for _, cont := range conts { - require.NoError(t, proposerServer.SyncCommitteePool.SaveSyncCommitteeContribution(cont)) - } - - aggregate, err := proposerServer.getSyncAggregate(t.Context(), 1, bytesutil.ToBytes32(conts[0].BlockRoot)) - require.NoError(t, err) - require.DeepEqual(t, bitfield.Bitvector32{0xf, 0xf, 0xf, 0xf}, aggregate.SyncCommitteeBits) - - aggregate, err = proposerServer.getSyncAggregate(t.Context(), 2, bytesutil.ToBytes32(conts[0].BlockRoot)) - require.NoError(t, err) - require.DeepEqual(t, bitfield.Bitvector32{0xaa, 0xaa, 0xaa, 0xaa}, aggregate.SyncCommitteeBits) - - aggregate, err = proposerServer.getSyncAggregate(t.Context(), 3, bytesutil.ToBytes32(conts[0].BlockRoot)) - require.NoError(t, err) - require.DeepEqual(t, bitfield.NewBitvector32(), aggregate.SyncCommitteeBits) -} - func TestProposer_PrepareBeaconProposer(t *testing.T) { type args struct { request *ethpb.PrepareBeaconProposerRequest diff --git a/changelog/radek_agg-sc-messages.md b/changelog/radek_agg-sc-messages.md new file mode 100644 index 0000000000..eb27548f89 --- /dev/null +++ b/changelog/radek_agg-sc-messages.md @@ -0,0 +1,3 @@ +### Added + +- Aggregate and pack sync committee messages into blocks. \ No newline at end of file