Compare commits

...

2 Commits

Author SHA1 Message Date
james-prysm
0a49546598 payload attestation pool 2026-01-30 15:45:17 -06:00
terence
a7fdd11777 gloas: sample PTC per committee (#16293)
This PR updates `get_ptc` construction to sample ptc
committee-by-committee instead of concatenating all beacon committees
into a large slice. No functional changes to payload attestation
verification
2026-01-29 14:21:54 +00:00
5 changed files with 544 additions and 19 deletions

View File

@@ -114,17 +114,32 @@ func payloadCommittee(ctx context.Context, st state.ReadOnlyBeaconState, slot pr
}
committeesPerSlot := helpers.SlotCommitteeCount(activeCount)
out := make([]primitives.ValidatorIndex, 0, activeCount/uint64(params.BeaconConfig().SlotsPerEpoch))
for i := primitives.CommitteeIndex(0); i < primitives.CommitteeIndex(committeesPerSlot); i++ {
committee, err := helpers.BeaconCommitteeFromState(ctx, st, slot, i)
if err != nil {
return nil, errors.Wrapf(err, "failed to get beacon committee %d", i)
selected := make([]primitives.ValidatorIndex, 0, fieldparams.PTCSize)
var i uint64
for uint64(len(selected)) < fieldparams.PTCSize {
if ctx.Err() != nil {
return nil, ctx.Err()
}
for committeeIndex := primitives.CommitteeIndex(0); committeeIndex < primitives.CommitteeIndex(committeesPerSlot); committeeIndex++ {
if uint64(len(selected)) >= fieldparams.PTCSize {
break
}
committee, err := helpers.BeaconCommitteeFromState(ctx, st, slot, committeeIndex)
if err != nil {
return nil, errors.Wrapf(err, "failed to get beacon committee %d", committeeIndex)
}
selected, i, err = selectByBalanceFill(ctx, st, committee, seed, selected, i)
if err != nil {
return nil, errors.Wrapf(err, "failed to sample beacon committee %d", committeeIndex)
}
}
out = append(out, committee...)
}
return selectByBalance(ctx, st, out, seed, fieldparams.PTCSize)
return selected, nil
}
// ptcSeed computes the seed for the payload timeliness committee.
@@ -148,33 +163,39 @@ func ptcSeed(st state.ReadOnlyBeaconState, epoch primitives.Epoch, slot primitiv
// if compute_balance_weighted_acceptance(state, indices[next], seed, i):
// selected.append(indices[next])
// i += 1
func selectByBalance(ctx context.Context, st state.ReadOnlyBeaconState, candidates []primitives.ValidatorIndex, seed [32]byte, count uint64) ([]primitives.ValidatorIndex, error) {
if len(candidates) == 0 {
return nil, errors.New("no candidates for balance weighted selection")
}
func selectByBalanceFill(
ctx context.Context,
st state.ReadOnlyBeaconState,
candidates []primitives.ValidatorIndex,
seed [32]byte,
selected []primitives.ValidatorIndex,
i uint64,
) ([]primitives.ValidatorIndex, uint64, error) {
hashFunc := hash.CustomSHA256Hasher()
// Pre-allocate buffer for hash input: seed (32 bytes) + round counter (8 bytes).
var buf [40]byte
copy(buf[:], seed[:])
maxBalance := params.BeaconConfig().MaxEffectiveBalanceElectra
selected := make([]primitives.ValidatorIndex, 0, count)
total := uint64(len(candidates))
for i := uint64(0); uint64(len(selected)) < count; i++ {
for _, idx := range candidates {
if ctx.Err() != nil {
return nil, ctx.Err()
return nil, i, ctx.Err()
}
idx := candidates[i%total]
ok, err := acceptByBalance(st, idx, buf[:], hashFunc, maxBalance, i)
if err != nil {
return nil, err
return nil, i, err
}
if ok {
selected = append(selected, idx)
}
if uint64(len(selected)) == fieldparams.PTCSize {
break
}
i++
}
return selected, nil
return selected, i, nil
}
// acceptByBalance determines if a validator is accepted based on its effective balance.

View File

@@ -0,0 +1,32 @@
load("@prysm//tools/go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["pool.go"],
importpath = "github.com/OffchainLabs/prysm/v7/beacon-chain/operations/payloadattestation",
visibility = ["//visibility:public"],
deps = [
"//consensus-types/primitives:go_default_library",
"//crypto/bls:go_default_library",
"//crypto/hash:go_default_library",
"//encoding/bytesutil:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
"@org_golang_google_protobuf//proto:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["pool_test.go"],
embed = [":go_default_library"],
deps = [
"//consensus-types/primitives:go_default_library",
"//crypto/bls:go_default_library",
"//encoding/bytesutil:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//testing/assert:go_default_library",
"//testing/require:go_default_library",
],
)

View File

@@ -0,0 +1,179 @@
package payloadattestation
import (
"sync"
"github.com/OffchainLabs/go-bitfield"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v7/crypto/bls"
"github.com/OffchainLabs/prysm/v7/crypto/hash"
"github.com/OffchainLabs/prysm/v7/encoding/bytesutil"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/pkg/errors"
"google.golang.org/protobuf/proto"
)
var errNilPayloadAttestationMessage = errors.New("nil payload attestation message")
// PoolManager maintains pending payload attestations.
// This pool is used by proposers to insert payload attestations into new blocks.
type PoolManager interface {
// PendingPayloadAttestations returns all pending aggregated payload attestations.
// If a slot is provided, only attestations for that slot are returned.
PendingPayloadAttestations(slot ...primitives.Slot) []*ethpb.PayloadAttestation
// InsertPayloadAttestation inserts or aggregates a payload attestation
// message into the pool. The idx parameter is the PTC committee index
// of the validator (position in the bitvector).
InsertPayloadAttestation(msg *ethpb.PayloadAttestationMessage, idx uint64) error
// Seen returns true if the PTC committee index has already been seen
// for the given PayloadAttestationData.
Seen(data *ethpb.PayloadAttestationData, idx uint64) bool
// MarkIncluded removes the attestation matching the given data from the pool.
MarkIncluded(att *ethpb.PayloadAttestation)
}
// Pool is a concrete implementation of PoolManager.
// Keyed by hash of PayloadAttestationData; stores aggregated PayloadAttestation.
type Pool struct {
lock sync.RWMutex
pending map[[32]byte]*ethpb.PayloadAttestation
}
// NewPool returns an initialized pool.
func NewPool() *Pool {
return &Pool{
pending: make(map[[32]byte]*ethpb.PayloadAttestation),
}
}
// PendingPayloadAttestations returns all pending payload attestations.
// If a slot filter is provided, only attestations for that slot are returned.
func (p *Pool) PendingPayloadAttestations(slot ...primitives.Slot) []*ethpb.PayloadAttestation {
p.lock.RLock()
defer p.lock.RUnlock()
result := make([]*ethpb.PayloadAttestation, 0, len(p.pending))
for _, att := range p.pending {
if len(slot) > 0 && att.Data.Slot != slot[0] {
continue
}
result = append(result, att)
}
return result
}
// InsertPayloadAttestation inserts a payload attestation message into the pool,
// aggregating it with any existing attestation that shares the same PayloadAttestationData.
// The idx parameter is the PTC committee index used to set the aggregation bit.
func (p *Pool) InsertPayloadAttestation(msg *ethpb.PayloadAttestationMessage, idx uint64) error {
if msg == nil || msg.Data == nil {
return errNilPayloadAttestationMessage
}
key, err := dataKey(msg.Data)
if err != nil {
return errors.Wrap(err, "could not compute data key")
}
p.lock.Lock()
defer p.lock.Unlock()
existing, ok := p.pending[key]
if !ok {
p.pending[key] = messageToPayloadAttestation(msg, idx)
return nil
}
if existing.AggregationBits.BitAt(idx) {
return nil
}
sig, err := aggregateSigFromMessage(existing, msg)
if err != nil {
return errors.Wrap(err, "could not aggregate signatures")
}
existing.Signature = sig
existing.AggregationBits.SetBitAt(idx, true)
return nil
}
// Seen returns true if the PTC committee index has already been seen
// for the given PayloadAttestationData.
func (p *Pool) Seen(data *ethpb.PayloadAttestationData, idx uint64) bool {
if data == nil {
return false
}
key, err := dataKey(data)
if err != nil {
return false
}
p.lock.RLock()
defer p.lock.RUnlock()
existing, ok := p.pending[key]
if !ok {
return false
}
return existing.AggregationBits.BitAt(idx)
}
// MarkIncluded removes the attestation with matching data from the pool.
func (p *Pool) MarkIncluded(att *ethpb.PayloadAttestation) {
if att == nil || att.Data == nil {
return
}
key, err := dataKey(att.Data)
if err != nil {
return
}
p.lock.Lock()
defer p.lock.Unlock()
delete(p.pending, key)
}
// messageToPayloadAttestation creates a PayloadAttestation with a single
// aggregated bit from the passed PayloadAttestationMessage.
func messageToPayloadAttestation(msg *ethpb.PayloadAttestationMessage, idx uint64) *ethpb.PayloadAttestation {
bits := bitfield.NewBitvector512()
bits.SetBitAt(idx, true)
data := &ethpb.PayloadAttestationData{
BeaconBlockRoot: bytesutil.SafeCopyBytes(msg.Data.BeaconBlockRoot),
Slot: msg.Data.Slot,
PayloadPresent: msg.Data.PayloadPresent,
BlobDataAvailable: msg.Data.BlobDataAvailable,
}
return &ethpb.PayloadAttestation{
AggregationBits: bits,
Data: data,
Signature: bytesutil.SafeCopyBytes(msg.Signature),
}
}
// aggregateSigFromMessage returns the aggregated signature by combining the
// existing aggregated signature with the message's signature.
func aggregateSigFromMessage(aggregated *ethpb.PayloadAttestation, message *ethpb.PayloadAttestationMessage) ([]byte, error) {
aggSig, err := bls.SignatureFromBytesNoValidation(aggregated.Signature)
if err != nil {
return nil, err
}
sig, err := bls.SignatureFromBytesNoValidation(message.Signature)
if err != nil {
return nil, err
}
return bls.AggregateSignatures([]bls.Signature{aggSig, sig}).Marshal(), nil
}
// dataKey computes a deterministic key for PayloadAttestationData
// by hashing its serialized form.
func dataKey(data *ethpb.PayloadAttestationData) ([32]byte, error) {
enc, err := proto.Marshal(data)
if err != nil {
return [32]byte{}, err
}
return hash.Hash(enc), nil
}

View File

@@ -0,0 +1,291 @@
package payloadattestation
import (
"testing"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v7/crypto/bls"
"github.com/OffchainLabs/prysm/v7/encoding/bytesutil"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/testing/assert"
"github.com/OffchainLabs/prysm/v7/testing/require"
)
func TestPool_PendingPayloadAttestations(t *testing.T) {
t.Run("empty pool", func(t *testing.T) {
pool := NewPool()
atts := pool.PendingPayloadAttestations()
assert.Equal(t, 0, len(atts))
})
t.Run("non-empty pool", func(t *testing.T) {
pool := NewPool()
sig := bls.NewAggregateSignature().Marshal()
msg1 := &ethpb.PayloadAttestationMessage{
ValidatorIndex: 0,
Data: &ethpb.PayloadAttestationData{
BeaconBlockRoot: make([]byte, 32),
Slot: 1,
PayloadPresent: true,
BlobDataAvailable: false,
},
Signature: sig,
}
msg2 := &ethpb.PayloadAttestationMessage{
ValidatorIndex: 1,
Data: &ethpb.PayloadAttestationData{
BeaconBlockRoot: make([]byte, 32),
Slot: 2,
PayloadPresent: false,
BlobDataAvailable: true,
},
Signature: sig,
}
require.NoError(t, pool.InsertPayloadAttestation(msg1, 0))
require.NoError(t, pool.InsertPayloadAttestation(msg2, 1))
atts := pool.PendingPayloadAttestations()
assert.Equal(t, 2, len(atts))
})
t.Run("filter by slot", func(t *testing.T) {
pool := NewPool()
sig := bls.NewAggregateSignature().Marshal()
msg1 := &ethpb.PayloadAttestationMessage{
ValidatorIndex: 0,
Data: &ethpb.PayloadAttestationData{
BeaconBlockRoot: make([]byte, 32),
Slot: 1,
PayloadPresent: true,
BlobDataAvailable: false,
},
Signature: sig,
}
msg2 := &ethpb.PayloadAttestationMessage{
ValidatorIndex: 1,
Data: &ethpb.PayloadAttestationData{
BeaconBlockRoot: make([]byte, 32),
Slot: 2,
PayloadPresent: false,
BlobDataAvailable: true,
},
Signature: sig,
}
require.NoError(t, pool.InsertPayloadAttestation(msg1, 0))
require.NoError(t, pool.InsertPayloadAttestation(msg2, 1))
atts := pool.PendingPayloadAttestations(primitives.Slot(1))
assert.Equal(t, 1, len(atts))
assert.Equal(t, primitives.Slot(1), atts[0].Data.Slot)
atts = pool.PendingPayloadAttestations(primitives.Slot(2))
assert.Equal(t, 1, len(atts))
assert.Equal(t, primitives.Slot(2), atts[0].Data.Slot)
atts = pool.PendingPayloadAttestations(primitives.Slot(99))
assert.Equal(t, 0, len(atts))
})
}
func TestPool_InsertPayloadAttestation(t *testing.T) {
t.Run("nil message", func(t *testing.T) {
pool := NewPool()
err := pool.InsertPayloadAttestation(nil, 0)
require.ErrorContains(t, "nil payload attestation message", err)
})
t.Run("nil data", func(t *testing.T) {
pool := NewPool()
err := pool.InsertPayloadAttestation(&ethpb.PayloadAttestationMessage{}, 0)
require.ErrorContains(t, "nil payload attestation message", err)
})
t.Run("insert creates new entry with correct aggregation bit", func(t *testing.T) {
pool := NewPool()
sig := bls.NewAggregateSignature().Marshal()
idx := uint64(5)
msg := &ethpb.PayloadAttestationMessage{
ValidatorIndex: 0,
Data: &ethpb.PayloadAttestationData{
BeaconBlockRoot: make([]byte, 32),
Slot: 1,
PayloadPresent: true,
BlobDataAvailable: false,
},
Signature: sig,
}
require.NoError(t, pool.InsertPayloadAttestation(msg, idx))
atts := pool.PendingPayloadAttestations()
require.Equal(t, 1, len(atts))
assert.Equal(t, true, atts[0].AggregationBits.BitAt(idx))
assert.Equal(t, false, atts[0].AggregationBits.BitAt(idx+1))
})
t.Run("duplicate index is no-op", func(t *testing.T) {
pool := NewPool()
sig := bls.NewAggregateSignature().Marshal()
idx := uint64(3)
msg := &ethpb.PayloadAttestationMessage{
ValidatorIndex: 0,
Data: &ethpb.PayloadAttestationData{
BeaconBlockRoot: make([]byte, 32),
Slot: 1,
PayloadPresent: true,
BlobDataAvailable: false,
},
Signature: sig,
}
require.NoError(t, pool.InsertPayloadAttestation(msg, idx))
firstSig := bytesutil.SafeCopyBytes(pool.PendingPayloadAttestations()[0].Signature)
require.NoError(t, pool.InsertPayloadAttestation(msg, idx))
atts := pool.PendingPayloadAttestations()
require.Equal(t, 1, len(atts))
assert.DeepEqual(t, firstSig, atts[0].Signature)
})
t.Run("aggregates different indices", func(t *testing.T) {
pool := NewPool()
sig := bls.NewAggregateSignature().Marshal()
root := make([]byte, 32)
root[0] = 'r'
data := &ethpb.PayloadAttestationData{
BeaconBlockRoot: root,
Slot: 1,
PayloadPresent: true,
BlobDataAvailable: false,
}
msg1 := &ethpb.PayloadAttestationMessage{
ValidatorIndex: 0,
Data: data,
Signature: sig,
}
msg2 := &ethpb.PayloadAttestationMessage{
ValidatorIndex: 1,
Data: data,
Signature: sig,
}
require.NoError(t, pool.InsertPayloadAttestation(msg1, 5))
require.NoError(t, pool.InsertPayloadAttestation(msg2, 7))
atts := pool.PendingPayloadAttestations()
require.Equal(t, 1, len(atts))
assert.Equal(t, true, atts[0].AggregationBits.BitAt(5))
assert.Equal(t, true, atts[0].AggregationBits.BitAt(7))
assert.Equal(t, false, atts[0].AggregationBits.BitAt(6))
})
t.Run("different data creates separate entries", func(t *testing.T) {
pool := NewPool()
sig := bls.NewAggregateSignature().Marshal()
msg1 := &ethpb.PayloadAttestationMessage{
ValidatorIndex: 0,
Data: &ethpb.PayloadAttestationData{
BeaconBlockRoot: make([]byte, 32),
Slot: 1,
PayloadPresent: true,
BlobDataAvailable: false,
},
Signature: sig,
}
msg2 := &ethpb.PayloadAttestationMessage{
ValidatorIndex: 1,
Data: &ethpb.PayloadAttestationData{
BeaconBlockRoot: make([]byte, 32),
Slot: 1,
PayloadPresent: false, // different
BlobDataAvailable: false,
},
Signature: sig,
}
require.NoError(t, pool.InsertPayloadAttestation(msg1, 0))
require.NoError(t, pool.InsertPayloadAttestation(msg2, 1))
atts := pool.PendingPayloadAttestations()
assert.Equal(t, 2, len(atts))
})
}
func TestPool_Seen(t *testing.T) {
pool := NewPool()
sig := bls.NewAggregateSignature().Marshal()
data := &ethpb.PayloadAttestationData{
BeaconBlockRoot: make([]byte, 32),
Slot: 1,
PayloadPresent: true,
BlobDataAvailable: false,
}
assert.Equal(t, false, pool.Seen(data, 5))
msg := &ethpb.PayloadAttestationMessage{
ValidatorIndex: 0,
Data: data,
Signature: sig,
}
require.NoError(t, pool.InsertPayloadAttestation(msg, 5))
assert.Equal(t, true, pool.Seen(data, 5))
assert.Equal(t, false, pool.Seen(data, 6))
assert.Equal(t, false, pool.Seen(nil, 5))
}
func TestPool_MarkIncluded(t *testing.T) {
t.Run("mark included removes from pool", func(t *testing.T) {
pool := NewPool()
sig := bls.NewAggregateSignature().Marshal()
msg := &ethpb.PayloadAttestationMessage{
ValidatorIndex: 0,
Data: &ethpb.PayloadAttestationData{
BeaconBlockRoot: make([]byte, 32),
Slot: 1,
PayloadPresent: true,
BlobDataAvailable: false,
},
Signature: sig,
}
require.NoError(t, pool.InsertPayloadAttestation(msg, 0))
assert.Equal(t, 1, len(pool.PendingPayloadAttestations()))
pool.MarkIncluded(&ethpb.PayloadAttestation{
Data: &ethpb.PayloadAttestationData{
BeaconBlockRoot: make([]byte, 32),
Slot: 1,
PayloadPresent: true,
BlobDataAvailable: false,
},
})
assert.Equal(t, 0, len(pool.PendingPayloadAttestations()))
})
t.Run("mark included with non-matching data does nothing", func(t *testing.T) {
pool := NewPool()
sig := bls.NewAggregateSignature().Marshal()
msg := &ethpb.PayloadAttestationMessage{
ValidatorIndex: 0,
Data: &ethpb.PayloadAttestationData{
BeaconBlockRoot: make([]byte, 32),
Slot: 1,
PayloadPresent: true,
BlobDataAvailable: false,
},
Signature: sig,
}
require.NoError(t, pool.InsertPayloadAttestation(msg, 0))
pool.MarkIncluded(&ethpb.PayloadAttestation{
Data: &ethpb.PayloadAttestationData{
BeaconBlockRoot: make([]byte, 32),
Slot: 999,
PayloadPresent: true,
BlobDataAvailable: false,
},
})
assert.Equal(t, 1, len(pool.PendingPayloadAttestations()))
})
t.Run("mark included with nil is safe", func(t *testing.T) {
pool := NewPool()
pool.MarkIncluded(nil)
pool.MarkIncluded(&ethpb.PayloadAttestation{})
})
}

View File

@@ -0,0 +1,2 @@
### Changed
- Sample PTC per committee to reduce allocations.