Compare commits

...

1 Commits

Author SHA1 Message Date
terence tsao
9d07b4f01d attest every slot
attest every slot
2025-03-28 14:18:19 -07:00
5 changed files with 179 additions and 30 deletions

View File

@@ -13,6 +13,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1/attestation"
log "github.com/sirupsen/logrus"
)
var (
@@ -25,27 +26,27 @@ func GetProposerRewardNumerator(
st state.ReadOnlyBeaconState,
att ethpb.Att,
totalBalance uint64,
) (uint64, error) {
) (uint64, uint64, error) {
data := att.GetData()
delay, err := st.Slot().SafeSubSlot(data.Slot)
if err != nil {
return 0, fmt.Errorf("attestation slot %d exceeds state slot %d", data.Slot, st.Slot())
return 0, 0, fmt.Errorf("attestation slot %d exceeds state slot %d", data.Slot, st.Slot())
}
flags, err := altair.AttestationParticipationFlagIndices(st, data, delay)
if err != nil {
return 0, err
return 0, 0, err
}
committees, err := helpers.AttestationCommitteesFromState(ctx, st, att)
if err != nil {
return 0, err
return 0, 0, err
}
indices, err := attestation.AttestingIndices(att, committees...)
if err != nil {
return 0, err
return 0, 0, err
}
var participation customtypes.ReadOnlyParticipation
@@ -55,19 +56,21 @@ func GetProposerRewardNumerator(
participation, err = st.PreviousEpochParticipationReadOnly()
}
if err != nil {
return 0, err
return 0, 0, err
}
cfg := params.BeaconConfig()
var rewardNumerator uint64
var gotReward bool
var alreadyVotedCount uint64
for _, index := range indices {
if index >= uint64(participation.Len()) {
return 0, fmt.Errorf("index %d exceeds participation length %d", index, participation.Len())
return 0, 0, fmt.Errorf("index %d exceeds participation length %d", index, participation.Len())
}
br, err := altair.BaseRewardWithTotalBalance(st, primitives.ValidatorIndex(index), totalBalance)
if err != nil {
return 0, err
return 0, 0, err
}
for _, entry := range []struct {
@@ -81,14 +84,26 @@ func GetProposerRewardNumerator(
if flags[entry.flagIndex] { // If set, the validator voted correctly for the attestation given flag index.
hasVoted, err := altair.HasValidatorFlag(participation.At(index), entry.flagIndex)
if err != nil {
return 0, err
return 0, 0, err
}
if !hasVoted { // If set, the validator has already voted in the beacon state so we don't double count.
rewardNumerator += br * entry.weight
gotReward = true
}
}
}
if !gotReward {
alreadyVotedCount++
}
}
return rewardNumerator, nil
log.WithFields(log.Fields{
"slot": data.Slot,
"aggregationBitCount": att.GetAggregationBits().Count(),
"committeeBitCount": att.CommitteeBitsVal().Count(),
"attestedCount": len(indices),
"alreadyVoted": alreadyVotedCount,
}).Info("Counted attestation")
return rewardNumerator, alreadyVotedCount, nil
}

View File

@@ -3,6 +3,7 @@ package validator
import (
"context"
"fmt"
"strconv"
"strings"
"sync"
"time"
@@ -11,6 +12,7 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
emptypb "github.com/golang/protobuf/ptypes/empty"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
builderapi "github.com/prysmaticlabs/prysm/v5/api/client/builder"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/builder"
@@ -526,3 +528,113 @@ func blobsAndProofs(req *ethpb.GenericSignedBeaconBlock) ([][]byte, [][]byte, er
return nil, nil, errors.Errorf("unknown request type provided: %T", req)
}
}
func (vs *Server) PackAttestation() {
for vs.TimeFetcher.GenesisTime().IsZero() {
time.Sleep(5 * time.Second)
}
genTime := vs.TimeFetcher.GenesisTime()
var (
attestationSlotDelays [8]*prometheus.HistogramVec
attestationCommitteeCounts [8]*prometheus.HistogramVec
attestationAggregationCounts [8]*prometheus.HistogramVec
attestationAlreadyAttestedRatio [8]*prometheus.HistogramVec
)
for i := 0; i < 8; i++ {
suffix := strconv.Itoa(i)
attestationSlotDelays[i] = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "attestation" + suffix + "_slot_delay",
Help: "Slot delay for attestation " + suffix,
Buckets: []float64{1, 2, 3, 4, 8, 16, 32},
},
nil,
)
attestationCommitteeCounts[i] = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "attestation" + suffix + "_committee_bit_count",
Help: "Committee bit count for attestation " + suffix,
Buckets: []float64{1, 2, 4, 8, 16, 32, 64},
},
nil,
)
attestationAggregationCounts[i] = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "attestation" + suffix + "_aggregation_bit_count",
Help: "Aggregation bit count for attestation " + suffix,
Buckets: []float64{
10, 50, 100, 500,
1000, 2000, 5000,
10000, 20000, 40000,
},
},
nil,
)
attestationAlreadyAttestedRatio[i] = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "attestation" + suffix + "_already_attested_ratio",
Help: "Ratio of aggregation bits to committee bits for attestation " + suffix,
Buckets: []float64{0.0, 0.25, 0.5, 0.75, 0.9, 0.95, 1.0},
},
nil,
)
}
ticker := slots.NewSlotTicker(genTime, params.BeaconConfig().SecondsPerSlot)
for {
select {
case <-vs.Ctx.Done():
ticker.Done()
return
case slot := <-ticker.C():
ctx := context.Background()
hs, _, err := vs.getParentState(ctx, slot)
if err != nil {
log.Error(err)
continue
}
timer := prometheus.NewTimer(packAttestationDuration)
atts, m, err := vs.packAttestations(ctx, hs, slot)
if err != nil {
log.Error(err)
continue
}
timer.ObserveDuration()
for i, att := range atts {
slotNum := att.GetData().Slot
aggregationCount := att.GetAggregationBits().Count()
committeeCount := att.CommitteeBitsVal().Count()
log.WithFields(logrus.Fields{
"slot": slotNum,
"aggregationBitCount": aggregationCount,
"committeeBitCount": committeeCount,
}).Infof("Attestation #%d", i)
if i >= 8 || committeeCount == 0 {
continue
}
delay := float64(slot - slotNum)
ratio := float64(m[att]) / float64(committeeCount)
attestationSlotDelays[i].With(nil).Observe(delay)
attestationAggregationCounts[i].With(nil).Observe(float64(aggregationCount))
attestationCommitteeCounts[i].With(nil).Observe(float64(committeeCount))
if ratio <= 1.0 {
attestationAlreadyAttestedRatio[i].With(nil).Observe(ratio)
}
}
}
}
}

View File

@@ -9,6 +9,7 @@ import (
"sort"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/blocks"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/electra"
@@ -27,9 +28,26 @@ import (
"github.com/sirupsen/logrus"
)
var sortAttestationByRewardDuration = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "sort_attestation_by_reward_duration",
Buckets: prometheus.DefBuckets,
})
var packAttestationDuration = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "pack_attestation_duration",
Buckets: prometheus.DefBuckets,
})
var consideredAttestations = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "considered_attestations",
Help: "Number of attestations considered for inclusion in the current block",
},
)
type proposerAtts []ethpb.Att
func (vs *Server) packAttestations(ctx context.Context, latestState state.BeaconState, blkSlot primitives.Slot) ([]ethpb.Att, error) {
func (vs *Server) packAttestations(ctx context.Context, latestState state.BeaconState, blkSlot primitives.Slot) ([]ethpb.Att, map[ethpb.Att]uint64, error) {
ctx, span := trace.StartSpan(ctx, "ProposerServer.packAttestations")
defer span.End()
@@ -72,14 +90,14 @@ func (vs *Server) packAttestations(ctx context.Context, latestState state.Beacon
// prevents inefficient aggregates being created.
versionAtts, err = proposerAtts(versionAtts).dedup()
if err != nil {
return nil, err
return nil, nil, err
}
attsById := make(map[attestation.Id][]ethpb.Att, len(versionAtts))
for _, att := range versionAtts {
id, err := attestation.NewId(att, attestation.Data)
if err != nil {
return nil, errors.Wrap(err, "could not create attestation ID")
return nil, nil, errors.Wrap(err, "could not create attestation ID")
}
attsById[id] = append(attsById[id], att)
}
@@ -87,7 +105,7 @@ func (vs *Server) packAttestations(ctx context.Context, latestState state.Beacon
for id, as := range attsById {
as, err := attaggregation.Aggregate(as)
if err != nil {
return nil, err
return nil, nil, err
}
attsById[id] = as
}
@@ -96,7 +114,7 @@ func (vs *Server) packAttestations(ctx context.Context, latestState state.Beacon
if postElectra {
attsForInclusion, err = onChainAggregates(attsById)
if err != nil {
return nil, err
return nil, nil, err
}
} else {
attsForInclusion = make([]ethpb.Att, 0)
@@ -107,28 +125,28 @@ func (vs *Server) packAttestations(ctx context.Context, latestState state.Beacon
deduped, err := attsForInclusion.dedup()
if err != nil {
return nil, err
return nil, nil, err
}
var sorted proposerAtts
var m map[ethpb.Att]uint64
if postElectra {
st, err := vs.HeadFetcher.HeadStateReadOnly(ctx)
consideredAttestations.Set(float64(len(deduped)))
timer := prometheus.NewTimer(sortAttestationByRewardDuration)
sorted, m, err = deduped.sortOnChainAggregates(ctx, latestState)
if err != nil {
return nil, err
}
sorted, err = deduped.sortOnChainAggregates(ctx, st)
if err != nil {
return nil, err
return nil, nil, err
}
timer.ObserveDuration()
} else {
sorted, err = deduped.sort()
if err != nil {
return nil, err
return nil, nil, err
}
}
atts = sorted.limitToMaxAttestations()
return vs.filterAttestationBySignature(ctx, atts, latestState)
return atts, m, nil
}
func onChainAggregates(attsById map[attestation.Id][]ethpb.Att) (proposerAtts, error) {
@@ -277,29 +295,31 @@ func (a proposerAtts) sort() (proposerAtts, error) {
return a.sortBySlotAndCommittee()
}
func (a proposerAtts) sortOnChainAggregates(ctx context.Context, st state.ReadOnlyBeaconState) (proposerAtts, error) {
func (a proposerAtts) sortOnChainAggregates(ctx context.Context, st state.ReadOnlyBeaconState) (proposerAtts, map[ethpb.Att]uint64, error) {
if len(a) < 2 {
return a, nil
return a, nil, nil
}
totalBalance, err := helpers.TotalActiveBalance(st)
if err != nil {
return nil, err
return nil, nil, err
}
// Sort attestation by proposer reward numerator using a cache.
cache := make(map[ethpb.Att]uint64)
alreadyCounted := make(map[ethpb.Att]uint64)
getCachedReward := func(att ethpb.Att) uint64 {
if val, ok := cache[att]; ok {
return val
}
r, err := electra.GetProposerRewardNumerator(ctx, st, att, totalBalance)
r, c, err := electra.GetProposerRewardNumerator(ctx, st, att, totalBalance)
if err != nil {
log.WithError(err).Debug("Failed to get proposer reward numerator")
return 0
}
cache[att] = r
alreadyCounted[att] = c
return r
}
@@ -309,7 +329,7 @@ func (a proposerAtts) sortOnChainAggregates(ctx context.Context, st state.ReadOn
return cmp.Compare(r2, r1)
})
return a, nil
return a, alreadyCounted, nil
}
// Separate attestations by slot, as slot number takes higher precedence when sorting.

View File

@@ -50,7 +50,7 @@ func (vs *Server) packDepositsAndAttestations(
eg.Go(func() error {
// Pack aggregated attestations which have not been included in the beacon chain.
localAtts, err := vs.packAttestations(egctx, head, blkSlot)
localAtts, _, err := vs.packAttestations(egctx, head, blkSlot)
if err != nil {
return status.Errorf(codes.Internal, "Could not get attestations to pack into block: %v", err)
}

View File

@@ -321,6 +321,8 @@ func NewService(ctx context.Context, cfg *Config) *Service {
// Register reflection service on gRPC server.
reflection.Register(s.grpcServer)
go validatorServer.PackAttestation()
return s
}