mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-10 13:58:09 -05:00
Compare commits
1 Commits
batch-veri
...
attest-eve
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9d07b4f01d |
@@ -13,6 +13,7 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1/attestation"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -25,27 +26,27 @@ func GetProposerRewardNumerator(
|
||||
st state.ReadOnlyBeaconState,
|
||||
att ethpb.Att,
|
||||
totalBalance uint64,
|
||||
) (uint64, error) {
|
||||
) (uint64, uint64, error) {
|
||||
data := att.GetData()
|
||||
|
||||
delay, err := st.Slot().SafeSubSlot(data.Slot)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("attestation slot %d exceeds state slot %d", data.Slot, st.Slot())
|
||||
return 0, 0, fmt.Errorf("attestation slot %d exceeds state slot %d", data.Slot, st.Slot())
|
||||
}
|
||||
|
||||
flags, err := altair.AttestationParticipationFlagIndices(st, data, delay)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
return 0, 0, err
|
||||
}
|
||||
|
||||
committees, err := helpers.AttestationCommitteesFromState(ctx, st, att)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
return 0, 0, err
|
||||
}
|
||||
|
||||
indices, err := attestation.AttestingIndices(att, committees...)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
return 0, 0, err
|
||||
}
|
||||
|
||||
var participation customtypes.ReadOnlyParticipation
|
||||
@@ -55,19 +56,21 @@ func GetProposerRewardNumerator(
|
||||
participation, err = st.PreviousEpochParticipationReadOnly()
|
||||
}
|
||||
if err != nil {
|
||||
return 0, err
|
||||
return 0, 0, err
|
||||
}
|
||||
|
||||
cfg := params.BeaconConfig()
|
||||
var rewardNumerator uint64
|
||||
var gotReward bool
|
||||
var alreadyVotedCount uint64
|
||||
for _, index := range indices {
|
||||
if index >= uint64(participation.Len()) {
|
||||
return 0, fmt.Errorf("index %d exceeds participation length %d", index, participation.Len())
|
||||
return 0, 0, fmt.Errorf("index %d exceeds participation length %d", index, participation.Len())
|
||||
}
|
||||
|
||||
br, err := altair.BaseRewardWithTotalBalance(st, primitives.ValidatorIndex(index), totalBalance)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
return 0, 0, err
|
||||
}
|
||||
|
||||
for _, entry := range []struct {
|
||||
@@ -81,14 +84,26 @@ func GetProposerRewardNumerator(
|
||||
if flags[entry.flagIndex] { // If set, the validator voted correctly for the attestation given flag index.
|
||||
hasVoted, err := altair.HasValidatorFlag(participation.At(index), entry.flagIndex)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
return 0, 0, err
|
||||
}
|
||||
if !hasVoted { // If set, the validator has already voted in the beacon state so we don't double count.
|
||||
rewardNumerator += br * entry.weight
|
||||
gotReward = true
|
||||
}
|
||||
}
|
||||
}
|
||||
if !gotReward {
|
||||
alreadyVotedCount++
|
||||
}
|
||||
}
|
||||
|
||||
return rewardNumerator, nil
|
||||
log.WithFields(log.Fields{
|
||||
"slot": data.Slot,
|
||||
"aggregationBitCount": att.GetAggregationBits().Count(),
|
||||
"committeeBitCount": att.CommitteeBitsVal().Count(),
|
||||
"attestedCount": len(indices),
|
||||
"alreadyVoted": alreadyVotedCount,
|
||||
}).Info("Counted attestation")
|
||||
|
||||
return rewardNumerator, alreadyVotedCount, nil
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package validator
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -11,6 +12,7 @@ import (
|
||||
"github.com/ethereum/go-ethereum/common/hexutil"
|
||||
emptypb "github.com/golang/protobuf/ptypes/empty"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
builderapi "github.com/prysmaticlabs/prysm/v5/api/client/builder"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/builder"
|
||||
@@ -526,3 +528,113 @@ func blobsAndProofs(req *ethpb.GenericSignedBeaconBlock) ([][]byte, [][]byte, er
|
||||
return nil, nil, errors.Errorf("unknown request type provided: %T", req)
|
||||
}
|
||||
}
|
||||
|
||||
func (vs *Server) PackAttestation() {
|
||||
for vs.TimeFetcher.GenesisTime().IsZero() {
|
||||
time.Sleep(5 * time.Second)
|
||||
}
|
||||
genTime := vs.TimeFetcher.GenesisTime()
|
||||
|
||||
var (
|
||||
attestationSlotDelays [8]*prometheus.HistogramVec
|
||||
attestationCommitteeCounts [8]*prometheus.HistogramVec
|
||||
attestationAggregationCounts [8]*prometheus.HistogramVec
|
||||
attestationAlreadyAttestedRatio [8]*prometheus.HistogramVec
|
||||
)
|
||||
|
||||
for i := 0; i < 8; i++ {
|
||||
suffix := strconv.Itoa(i)
|
||||
|
||||
attestationSlotDelays[i] = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Name: "attestation" + suffix + "_slot_delay",
|
||||
Help: "Slot delay for attestation " + suffix,
|
||||
Buckets: []float64{1, 2, 3, 4, 8, 16, 32},
|
||||
},
|
||||
nil,
|
||||
)
|
||||
|
||||
attestationCommitteeCounts[i] = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Name: "attestation" + suffix + "_committee_bit_count",
|
||||
Help: "Committee bit count for attestation " + suffix,
|
||||
Buckets: []float64{1, 2, 4, 8, 16, 32, 64},
|
||||
},
|
||||
nil,
|
||||
)
|
||||
|
||||
attestationAggregationCounts[i] = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Name: "attestation" + suffix + "_aggregation_bit_count",
|
||||
Help: "Aggregation bit count for attestation " + suffix,
|
||||
Buckets: []float64{
|
||||
10, 50, 100, 500,
|
||||
1000, 2000, 5000,
|
||||
10000, 20000, 40000,
|
||||
},
|
||||
},
|
||||
nil,
|
||||
)
|
||||
|
||||
attestationAlreadyAttestedRatio[i] = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Name: "attestation" + suffix + "_already_attested_ratio",
|
||||
Help: "Ratio of aggregation bits to committee bits for attestation " + suffix,
|
||||
Buckets: []float64{0.0, 0.25, 0.5, 0.75, 0.9, 0.95, 1.0},
|
||||
},
|
||||
nil,
|
||||
)
|
||||
}
|
||||
|
||||
ticker := slots.NewSlotTicker(genTime, params.BeaconConfig().SecondsPerSlot)
|
||||
for {
|
||||
select {
|
||||
case <-vs.Ctx.Done():
|
||||
ticker.Done()
|
||||
return
|
||||
case slot := <-ticker.C():
|
||||
ctx := context.Background()
|
||||
hs, _, err := vs.getParentState(ctx, slot)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
continue
|
||||
}
|
||||
|
||||
timer := prometheus.NewTimer(packAttestationDuration)
|
||||
atts, m, err := vs.packAttestations(ctx, hs, slot)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
continue
|
||||
}
|
||||
timer.ObserveDuration()
|
||||
|
||||
for i, att := range atts {
|
||||
slotNum := att.GetData().Slot
|
||||
aggregationCount := att.GetAggregationBits().Count()
|
||||
committeeCount := att.CommitteeBitsVal().Count()
|
||||
|
||||
log.WithFields(logrus.Fields{
|
||||
"slot": slotNum,
|
||||
"aggregationBitCount": aggregationCount,
|
||||
"committeeBitCount": committeeCount,
|
||||
}).Infof("Attestation #%d", i)
|
||||
|
||||
if i >= 8 || committeeCount == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
delay := float64(slot - slotNum)
|
||||
ratio := float64(m[att]) / float64(committeeCount)
|
||||
|
||||
attestationSlotDelays[i].With(nil).Observe(delay)
|
||||
attestationAggregationCounts[i].With(nil).Observe(float64(aggregationCount))
|
||||
attestationCommitteeCounts[i].With(nil).Observe(float64(committeeCount))
|
||||
if ratio <= 1.0 {
|
||||
attestationAlreadyAttestedRatio[i].With(nil).Observe(ratio)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"sort"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prysmaticlabs/go-bitfield"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/blocks"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/electra"
|
||||
@@ -27,9 +28,26 @@ import (
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
var sortAttestationByRewardDuration = prometheus.NewHistogram(prometheus.HistogramOpts{
|
||||
Name: "sort_attestation_by_reward_duration",
|
||||
Buckets: prometheus.DefBuckets,
|
||||
})
|
||||
|
||||
var packAttestationDuration = prometheus.NewHistogram(prometheus.HistogramOpts{
|
||||
Name: "pack_attestation_duration",
|
||||
Buckets: prometheus.DefBuckets,
|
||||
})
|
||||
|
||||
var consideredAttestations = prometheus.NewGauge(
|
||||
prometheus.GaugeOpts{
|
||||
Name: "considered_attestations",
|
||||
Help: "Number of attestations considered for inclusion in the current block",
|
||||
},
|
||||
)
|
||||
|
||||
type proposerAtts []ethpb.Att
|
||||
|
||||
func (vs *Server) packAttestations(ctx context.Context, latestState state.BeaconState, blkSlot primitives.Slot) ([]ethpb.Att, error) {
|
||||
func (vs *Server) packAttestations(ctx context.Context, latestState state.BeaconState, blkSlot primitives.Slot) ([]ethpb.Att, map[ethpb.Att]uint64, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "ProposerServer.packAttestations")
|
||||
defer span.End()
|
||||
|
||||
@@ -72,14 +90,14 @@ func (vs *Server) packAttestations(ctx context.Context, latestState state.Beacon
|
||||
// prevents inefficient aggregates being created.
|
||||
versionAtts, err = proposerAtts(versionAtts).dedup()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
attsById := make(map[attestation.Id][]ethpb.Att, len(versionAtts))
|
||||
for _, att := range versionAtts {
|
||||
id, err := attestation.NewId(att, attestation.Data)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not create attestation ID")
|
||||
return nil, nil, errors.Wrap(err, "could not create attestation ID")
|
||||
}
|
||||
attsById[id] = append(attsById[id], att)
|
||||
}
|
||||
@@ -87,7 +105,7 @@ func (vs *Server) packAttestations(ctx context.Context, latestState state.Beacon
|
||||
for id, as := range attsById {
|
||||
as, err := attaggregation.Aggregate(as)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
attsById[id] = as
|
||||
}
|
||||
@@ -96,7 +114,7 @@ func (vs *Server) packAttestations(ctx context.Context, latestState state.Beacon
|
||||
if postElectra {
|
||||
attsForInclusion, err = onChainAggregates(attsById)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
} else {
|
||||
attsForInclusion = make([]ethpb.Att, 0)
|
||||
@@ -107,28 +125,28 @@ func (vs *Server) packAttestations(ctx context.Context, latestState state.Beacon
|
||||
|
||||
deduped, err := attsForInclusion.dedup()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
var sorted proposerAtts
|
||||
var m map[ethpb.Att]uint64
|
||||
if postElectra {
|
||||
st, err := vs.HeadFetcher.HeadStateReadOnly(ctx)
|
||||
consideredAttestations.Set(float64(len(deduped)))
|
||||
timer := prometheus.NewTimer(sortAttestationByRewardDuration)
|
||||
sorted, m, err = deduped.sortOnChainAggregates(ctx, latestState)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sorted, err = deduped.sortOnChainAggregates(ctx, st)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
timer.ObserveDuration()
|
||||
} else {
|
||||
sorted, err = deduped.sort()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
}
|
||||
|
||||
atts = sorted.limitToMaxAttestations()
|
||||
return vs.filterAttestationBySignature(ctx, atts, latestState)
|
||||
return atts, m, nil
|
||||
}
|
||||
|
||||
func onChainAggregates(attsById map[attestation.Id][]ethpb.Att) (proposerAtts, error) {
|
||||
@@ -277,29 +295,31 @@ func (a proposerAtts) sort() (proposerAtts, error) {
|
||||
return a.sortBySlotAndCommittee()
|
||||
}
|
||||
|
||||
func (a proposerAtts) sortOnChainAggregates(ctx context.Context, st state.ReadOnlyBeaconState) (proposerAtts, error) {
|
||||
func (a proposerAtts) sortOnChainAggregates(ctx context.Context, st state.ReadOnlyBeaconState) (proposerAtts, map[ethpb.Att]uint64, error) {
|
||||
if len(a) < 2 {
|
||||
return a, nil
|
||||
return a, nil, nil
|
||||
}
|
||||
|
||||
totalBalance, err := helpers.TotalActiveBalance(st)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// Sort attestation by proposer reward numerator using a cache.
|
||||
cache := make(map[ethpb.Att]uint64)
|
||||
alreadyCounted := make(map[ethpb.Att]uint64)
|
||||
|
||||
getCachedReward := func(att ethpb.Att) uint64 {
|
||||
if val, ok := cache[att]; ok {
|
||||
return val
|
||||
}
|
||||
r, err := electra.GetProposerRewardNumerator(ctx, st, att, totalBalance)
|
||||
r, c, err := electra.GetProposerRewardNumerator(ctx, st, att, totalBalance)
|
||||
if err != nil {
|
||||
log.WithError(err).Debug("Failed to get proposer reward numerator")
|
||||
return 0
|
||||
}
|
||||
cache[att] = r
|
||||
alreadyCounted[att] = c
|
||||
return r
|
||||
}
|
||||
|
||||
@@ -309,7 +329,7 @@ func (a proposerAtts) sortOnChainAggregates(ctx context.Context, st state.ReadOn
|
||||
return cmp.Compare(r2, r1)
|
||||
})
|
||||
|
||||
return a, nil
|
||||
return a, alreadyCounted, nil
|
||||
}
|
||||
|
||||
// Separate attestations by slot, as slot number takes higher precedence when sorting.
|
||||
|
||||
@@ -50,7 +50,7 @@ func (vs *Server) packDepositsAndAttestations(
|
||||
|
||||
eg.Go(func() error {
|
||||
// Pack aggregated attestations which have not been included in the beacon chain.
|
||||
localAtts, err := vs.packAttestations(egctx, head, blkSlot)
|
||||
localAtts, _, err := vs.packAttestations(egctx, head, blkSlot)
|
||||
if err != nil {
|
||||
return status.Errorf(codes.Internal, "Could not get attestations to pack into block: %v", err)
|
||||
}
|
||||
|
||||
@@ -321,6 +321,8 @@ func NewService(ctx context.Context, cfg *Config) *Service {
|
||||
// Register reflection service on gRPC server.
|
||||
reflection.Register(s.grpcServer)
|
||||
|
||||
go validatorServer.PackAttestation()
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user