Compare commits

..

3 Commits

Author SHA1 Message Date
terencechain
2fa646e7c5 Merge branch 'develop' into new-slot-before-propose 2022-05-20 06:54:01 -07:00
terence tsao
90df2e4d00 Add new slot lock 2022-05-19 13:41:39 -07:00
terence tsao
c43ad9d561 Call NewSlot before propose block 2022-05-19 11:37:56 -07:00
11 changed files with 337 additions and 339 deletions

4
.github/CODEOWNERS vendored
View File

@@ -14,7 +14,3 @@ deps.bzl @prysmaticlabs/core-team
/beacon-chain/state/v2/ @rkapka @nisdas
/beacon-chain/state/v3/ @rkapka @nisdas
/beacon-chain/state/state-native/ @rkapka @nisdas
# Setting new static analysis exclusions generally should not occur.
# Any additions of exclusions should be approved by Preston and/or Nishant.
nogo_config.json @prestonvanloon @nisdas

View File

@@ -128,13 +128,13 @@ nogo(
"//tools/analyzers/recursivelock:go_default_library",
"//tools/analyzers/shadowpredecl:go_default_library",
"//tools/analyzers/slicedirect:go_default_library",
"//tools/analyzers/uintcast:go_default_library",
] + select({
# nogo checks that fail with coverage enabled.
":coverage_enabled": [],
"//conditions:default": [
"@org_golang_x_tools//go/analysis/passes/lostcancel:go_default_library",
"@org_golang_x_tools//go/analysis/passes/composite:go_default_library",
"//tools/analyzers/uintcast:go_default_library",
],
}),
)

View File

@@ -32,8 +32,9 @@ type ChainInfoFetcher interface {
}
// HeadUpdater defines a common interface for methods in blockchain service
// which allow to update the head info
// which allow to update the head info and update propose boost score and justified check point via on_tick.
type HeadUpdater interface {
NewSlot(ctx context.Context, slot types.Slot) error
UpdateHead(context.Context) error
}

View File

@@ -30,6 +30,8 @@ import (
// if ancestor_at_finalized_slot == store.finalized_checkpoint.root:
// store.justified_checkpoint = store.best_justified_checkpoint
func (s *Service) NewSlot(ctx context.Context, slot types.Slot) error {
s.newSlotLock.Lock()
defer s.newSlotLock.Unlock()
// Reset proposer boost root in fork choice.
if err := s.cfg.ForkChoiceStore.ResetBoostedProposerRoot(ctx); err != nil {

View File

@@ -66,6 +66,7 @@ type Service struct {
wsVerifier *WeakSubjectivityVerifier
store *store.Store
processAttestationsLock sync.Mutex
newSlotLock sync.Mutex
}
// config options for the service.

View File

@@ -456,3 +456,8 @@ func (s *ChainService) UpdateHead(_ context.Context) error { return nil }
// ReceiveAttesterSlashing mocks the same method in the chain service.
func (s *ChainService) ReceiveAttesterSlashing(context.Context, *ethpb.AttesterSlashing) {}
// NewSlot mocks the same method in the chain service.
func (s *ChainService) NewSlot(context.Context, types.Slot) error {
return nil
}

View File

@@ -27,12 +27,81 @@ func (vs *Server) StreamBlocksAltair(req *ethpb.StreamBlocksRequest, stream ethp
select {
case blockEvent := <-blocksChannel:
if req.VerifiedOnly {
if err := sendVerifiedBlocks(stream, blockEvent); err != nil {
return err
if blockEvent.Type == statefeed.BlockProcessed {
data, ok := blockEvent.Data.(*statefeed.BlockProcessedData)
if !ok || data == nil {
continue
}
b := &ethpb.StreamBlocksResponse{}
switch data.SignedBlock.Version() {
case version.Phase0:
phBlk, ok := data.SignedBlock.Proto().(*ethpb.SignedBeaconBlock)
if !ok {
log.Warn("Mismatch between version and block type, was expecting SignedBeaconBlock")
continue
}
b.Block = &ethpb.StreamBlocksResponse_Phase0Block{Phase0Block: phBlk}
case version.Altair:
phBlk, ok := data.SignedBlock.Proto().(*ethpb.SignedBeaconBlockAltair)
if !ok {
log.Warn("Mismatch between version and block type, was expecting SignedBeaconBlockAltair")
continue
}
b.Block = &ethpb.StreamBlocksResponse_AltairBlock{AltairBlock: phBlk}
case version.Bellatrix:
phBlk, ok := data.SignedBlock.Proto().(*ethpb.SignedBeaconBlockBellatrix)
if !ok {
log.Warn("Mismatch between version and block type, was expecting SignedBeaconBlockBellatrix")
continue
}
b.Block = &ethpb.StreamBlocksResponse_BellatrixBlock{BellatrixBlock: phBlk}
}
if err := stream.Send(b); err != nil {
return status.Errorf(codes.Unavailable, "Could not send over stream: %v", err)
}
}
} else {
if err := vs.sendBlocks(stream, blockEvent); err != nil {
return err
if blockEvent.Type == blockfeed.ReceivedBlock {
data, ok := blockEvent.Data.(*blockfeed.ReceivedBlockData)
if !ok {
// Got bad data over the stream.
continue
}
if data.SignedBlock == nil {
// One nil block shouldn't stop the stream.
continue
}
headState, err := vs.HeadFetcher.HeadState(vs.Ctx)
if err != nil {
log.WithError(err).WithField("blockSlot", data.SignedBlock.Block().Slot()).Error("Could not get head state")
continue
}
signed := data.SignedBlock
if err := blocks.VerifyBlockSignature(headState, signed.Block().ProposerIndex(), signed.Signature(), signed.Block().HashTreeRoot); err != nil {
log.WithError(err).WithField("blockSlot", data.SignedBlock.Block().Slot()).Error("Could not verify block signature")
continue
}
b := &ethpb.StreamBlocksResponse{}
switch data.SignedBlock.Version() {
case version.Phase0:
phBlk, ok := data.SignedBlock.Proto().(*ethpb.SignedBeaconBlock)
if !ok {
log.Warn("Mismatch between version and block type, was expecting *ethpb.SignedBeaconBlock")
continue
}
b.Block = &ethpb.StreamBlocksResponse_Phase0Block{Phase0Block: phBlk}
case version.Altair:
phBlk, ok := data.SignedBlock.Proto().(*ethpb.SignedBeaconBlockAltair)
if !ok {
log.Warn("Mismatch between version and block type, was expecting *v2.SignedBeaconBlockAltair")
continue
}
b.Block = &ethpb.StreamBlocksResponse_AltairBlock{AltairBlock: phBlk}
}
if err := stream.Send(b); err != nil {
return status.Errorf(codes.Unavailable, "Could not send over stream: %v", err)
}
}
}
case <-blockSub.Err():
@@ -44,86 +113,3 @@ func (vs *Server) StreamBlocksAltair(req *ethpb.StreamBlocksRequest, stream ethp
}
}
}
func sendVerifiedBlocks(stream ethpb.BeaconNodeValidator_StreamBlocksAltairServer, blockEvent *feed.Event) error {
if blockEvent.Type != statefeed.BlockProcessed {
return nil
}
data, ok := blockEvent.Data.(*statefeed.BlockProcessedData)
if !ok || data == nil {
return nil
}
b := &ethpb.StreamBlocksResponse{}
switch data.SignedBlock.Version() {
case version.Phase0:
phBlk, ok := data.SignedBlock.Proto().(*ethpb.SignedBeaconBlock)
if !ok {
log.Warn("Mismatch between version and block type, was expecting SignedBeaconBlock")
return nil
}
b.Block = &ethpb.StreamBlocksResponse_Phase0Block{Phase0Block: phBlk}
case version.Altair:
phBlk, ok := data.SignedBlock.Proto().(*ethpb.SignedBeaconBlockAltair)
if !ok {
log.Warn("Mismatch between version and block type, was expecting SignedBeaconBlockAltair")
return nil
}
b.Block = &ethpb.StreamBlocksResponse_AltairBlock{AltairBlock: phBlk}
case version.Bellatrix:
phBlk, ok := data.SignedBlock.Proto().(*ethpb.SignedBeaconBlockBellatrix)
if !ok {
log.Warn("Mismatch between version and block type, was expecting SignedBeaconBlockBellatrix")
return nil
}
b.Block = &ethpb.StreamBlocksResponse_BellatrixBlock{BellatrixBlock: phBlk}
}
if err := stream.Send(b); err != nil {
return status.Errorf(codes.Unavailable, "Could not send over stream: %v", err)
}
return nil
}
func (vs *Server) sendBlocks(stream ethpb.BeaconNodeValidator_StreamBlocksAltairServer, blockEvent *feed.Event) error {
if blockEvent.Type != blockfeed.ReceivedBlock {
return nil
}
data, ok := blockEvent.Data.(*blockfeed.ReceivedBlockData)
if !ok {
// Got bad data over the stream.
return nil
}
if data.SignedBlock == nil {
// One nil block shouldn't stop the stream.
return nil
}
log := log.WithField("blockSlot", data.SignedBlock.Block().Slot())
headState, err := vs.HeadFetcher.HeadState(vs.Ctx)
if err != nil {
log.WithError(err).Error("Could not get head state")
return nil
}
signed := data.SignedBlock
if err := blocks.VerifyBlockSignature(headState, signed.Block().ProposerIndex(), signed.Signature(), signed.Block().HashTreeRoot); err != nil {
log.WithError(err).Error("Could not verify block signature")
return nil
}
b := &ethpb.StreamBlocksResponse{}
switch p := data.SignedBlock.Proto().(type) {
case *ethpb.SignedBeaconBlock:
b.Block = &ethpb.StreamBlocksResponse_Phase0Block{Phase0Block: p}
case *ethpb.SignedBeaconBlockAltair:
b.Block = &ethpb.StreamBlocksResponse_AltairBlock{AltairBlock: p}
case *ethpb.SignedBeaconBlockBellatrix:
b.Block = &ethpb.StreamBlocksResponse_BellatrixBlock{BellatrixBlock: p}
default:
log.Errorf("Unknown block type %T", p)
}
if err := stream.Send(b); err != nil {
return status.Errorf(codes.Unavailable, "Could not send over stream: %v", err)
}
return nil
}

View File

@@ -82,6 +82,9 @@ func (vs *Server) buildPhase0BlockData(ctx context.Context, req *ethpb.BlockRequ
return nil, fmt.Errorf("syncing to latest head, not ready to respond")
}
if err := vs.HeadUpdater.NewSlot(ctx, req.Slot); err != nil {
log.WithError(err).Error("Could not update propose boost score")
}
if err := vs.HeadUpdater.UpdateHead(ctx); err != nil {
log.WithError(err).Error("Could not process attestations and update head")
}

View File

@@ -36,44 +36,6 @@ func deepValueEqual(v1, v2 reflect.Value, visited map[visit]bool, depth int) boo
if v1.Type() != v2.Type() {
return false
}
if compareAddr(v1, v2, visited) {
return true
}
switch v1.Kind() {
case reflect.Array:
for i := 0; i < v1.Len(); i++ {
if !deepValueEqual(v1.Index(i), v2.Index(i), visited, depth+1) {
return false
}
}
return true
case reflect.Slice:
return compareSlice(v1, v2, visited, depth)
case reflect.Interface:
if v1.IsNil() || v2.IsNil() {
return v1.IsNil() == v2.IsNil()
}
return deepValueEqual(v1.Elem(), v2.Elem(), visited, depth+1)
case reflect.Ptr:
if v1.Pointer() == v2.Pointer() {
return true
}
return deepValueEqual(v1.Elem(), v2.Elem(), visited, depth+1)
case reflect.Struct:
for i, n := 0, v1.NumField(); i < n; i++ {
if !deepValueEqual(v1.Field(i), v2.Field(i), visited, depth+1) {
return false
}
}
return true
default:
return deepValueBaseTypeEqual(v1, v2)
}
}
func compareAddr(v1, v2 reflect.Value, visited map[visit]bool) bool {
// We want to avoid putting more in the visited map than we need to.
// For any possible reference cycle that might be encountered,
// hard(t) needs to return true for at least one of the types in the cycle.
@@ -105,7 +67,57 @@ func compareAddr(v1, v2 reflect.Value, visited map[visit]bool) bool {
// Remember for later.
visited[v] = true
}
return false
switch v1.Kind() {
case reflect.Array:
for i := 0; i < v1.Len(); i++ {
if !deepValueEqual(v1.Index(i), v2.Index(i), visited, depth+1) {
return false
}
}
return true
case reflect.Slice:
if v1.IsNil() && v2.Len() == 0 {
return true
}
if v1.Len() == 0 && v2.IsNil() {
return true
}
if v1.IsNil() && v2.IsNil() {
return true
}
if v1.Len() != v2.Len() {
return false
}
if v1.Pointer() == v2.Pointer() {
return true
}
for i := 0; i < v1.Len(); i++ {
if !deepValueEqual(v1.Index(i), v2.Index(i), visited, depth+1) {
return false
}
}
return true
case reflect.Interface:
if v1.IsNil() || v2.IsNil() {
return v1.IsNil() == v2.IsNil()
}
return deepValueEqual(v1.Elem(), v2.Elem(), visited, depth+1)
case reflect.Ptr:
if v1.Pointer() == v2.Pointer() {
return true
}
return deepValueEqual(v1.Elem(), v2.Elem(), visited, depth+1)
case reflect.Struct:
for i, n := 0, v1.NumField(); i < n; i++ {
if !deepValueEqual(v1.Field(i), v2.Field(i), visited, depth+1) {
return false
}
}
return true
default:
return deepValueBaseTypeEqual(v1, v2)
}
}
func deepValueEqualExportedOnly(v1, v2 reflect.Value, visited map[visit]bool, depth int) bool {
@@ -115,9 +127,35 @@ func deepValueEqualExportedOnly(v1, v2 reflect.Value, visited map[visit]bool, de
if v1.Type() != v2.Type() {
return false
}
// We want to avoid putting more in the visited map than we need to.
// For any possible reference cycle that might be encountered,
// hard(t) needs to return true for at least one of the types in the cycle.
hard := func(k reflect.Kind) bool {
switch k {
case reflect.Slice, reflect.Ptr, reflect.Interface:
return true
}
return false
}
if compareAddr(v1, v2, visited) {
return true
if v1.CanAddr() && v2.CanAddr() && hard(v1.Kind()) {
addr1 := unsafe.Pointer(v1.UnsafeAddr()) // #nosec G103 -- Test compare only
addr2 := unsafe.Pointer(v2.UnsafeAddr()) // #nosec G103 -- Test compare only
if uintptr(addr1) > uintptr(addr2) {
// Canonicalize order to reduce number of entries in visited.
// Assumes non-moving garbage collector.
addr1, addr2 = addr2, addr1
}
// Short circuit if references are already seen.
typ := v1.Type()
v := visit{addr1, addr2, typ}
if visited[v] {
return true
}
// Remember for later.
visited[v] = true
}
switch v1.Kind() {
@@ -178,30 +216,6 @@ func deepValueEqualExportedOnly(v1, v2 reflect.Value, visited map[visit]bool, de
}
}
func compareSlice(v1, v2 reflect.Value, visited map[visit]bool, depth int) bool {
if v1.IsNil() && v2.Len() == 0 {
return true
}
if v1.Len() == 0 && v2.IsNil() {
return true
}
if v1.IsNil() && v2.IsNil() {
return true
}
if v1.Len() != v2.Len() {
return false
}
if v1.Pointer() == v2.Pointer() {
return true
}
for i := 0; i < v1.Len(); i++ {
if !deepValueEqual(v1.Index(i), v2.Index(i), visited, depth+1) {
return false
}
}
return true
}
func deepValueBaseTypeEqual(v1, v2 reflect.Value) bool {
switch v1.Kind() {
case reflect.String:

View File

@@ -93,7 +93,57 @@ func run(pass *analysis.Pass) (interface{}, error) {
reportUnhandledError(pass, stmt.Call.Lparen, stmt.Call)
}
case *ast.AssignStmt:
inspectAssignStatement(pass, stmt)
if len(stmt.Rhs) == 1 {
// single value on rhs; check against lhs identifiers
if call, ok := stmt.Rhs[0].(*ast.CallExpr); ok {
if ignoreCall(pass, call) {
break
}
isError := errorsByArg(pass, call)
for i := 0; i < len(stmt.Lhs); i++ {
if id, ok := stmt.Lhs[i].(*ast.Ident); ok {
// We shortcut calls to recover() because errorsByArg can't
// check its return types for errors since it returns interface{}.
if id.Name == "_" && (isRecover(pass, call) || isError[i]) {
reportUnhandledError(pass, id.NamePos, call)
}
}
}
} else if assert, ok := stmt.Rhs[0].(*ast.TypeAssertExpr); ok {
if assert.Type == nil {
// type switch
break
}
if len(stmt.Lhs) < 2 {
// assertion result not read
reportUnhandledTypeAssertion(pass, stmt.Rhs[0].Pos())
} else if id, ok := stmt.Lhs[1].(*ast.Ident); ok && id.Name == "_" {
// assertion result ignored
reportUnhandledTypeAssertion(pass, id.NamePos)
}
}
} else {
// multiple value on rhs; in this case a call can't return
// multiple values. Assume len(stmt.Lhs) == len(stmt.Rhs)
for i := 0; i < len(stmt.Lhs); i++ {
if id, ok := stmt.Lhs[i].(*ast.Ident); ok {
if call, ok := stmt.Rhs[i].(*ast.CallExpr); ok {
if ignoreCall(pass, call) {
continue
}
if id.Name == "_" && callReturnsError(pass, call) {
reportUnhandledError(pass, id.NamePos, call)
}
} else if assert, ok := stmt.Rhs[i].(*ast.TypeAssertExpr); ok {
if assert.Type == nil {
// Shouldn't happen anyway, no multi assignment in type switches
continue
}
reportUnhandledError(pass, id.NamePos, nil)
}
}
}
}
default:
}
})
@@ -101,59 +151,6 @@ func run(pass *analysis.Pass) (interface{}, error) {
return nil, nil
}
func inspectAssignStatement(pass *analysis.Pass, stmt *ast.AssignStmt) {
if len(stmt.Rhs) == 1 {
// single value on rhs; check against lhs identifiers
if call, ok := stmt.Rhs[0].(*ast.CallExpr); ok {
if ignoreCall(pass, call) {
return
}
isError := errorsByArg(pass, call)
for i := 0; i < len(stmt.Lhs); i++ {
if id, ok := stmt.Lhs[i].(*ast.Ident); ok {
// We shortcut calls to recover() because errorsByArg can't
// check its return types for errors since it returns interface{}.
if id.Name == "_" && (isRecover(pass, call) || isError[i]) {
reportUnhandledError(pass, id.NamePos, call)
}
}
}
} else if assert, ok := stmt.Rhs[0].(*ast.TypeAssertExpr); ok {
if assert.Type == nil {
return
}
if len(stmt.Lhs) < 2 {
// assertion result not read
reportUnhandledTypeAssertion(pass, stmt.Rhs[0].Pos())
} else if id, ok := stmt.Lhs[1].(*ast.Ident); ok && id.Name == "_" {
// assertion result ignored
reportUnhandledTypeAssertion(pass, id.NamePos)
}
}
} else {
// multiple value on rhs; in this case a call can't return
// multiple values. Assume len(stmt.Lhs) == len(stmt.Rhs)
for i := 0; i < len(stmt.Lhs); i++ {
if id, ok := stmt.Lhs[i].(*ast.Ident); ok {
if call, ok := stmt.Rhs[i].(*ast.CallExpr); ok {
if ignoreCall(pass, call) {
continue
}
if id.Name == "_" && callReturnsError(pass, call) {
reportUnhandledError(pass, id.NamePos, call)
}
} else if assert, ok := stmt.Rhs[i].(*ast.TypeAssertExpr); ok {
if assert.Type == nil {
// Shouldn't happen anyway, no multi assignment in type switches
continue
}
reportUnhandledError(pass, id.NamePos, nil)
}
}
}
}
}
func reportUnhandledError(pass *analysis.Pass, pos token.Pos, call *ast.CallExpr) {
pass.Reportf(pos, "Unhandled error for function call %s", fullName(pass, call))
}

View File

@@ -257,10 +257,128 @@ func (v *validator) LogValidatorGainsAndLosses(ctx context.Context, slot types.S
v.voteStats.startEpoch = prevEpoch
}
}
gweiPerEth := float64(params.BeaconConfig().GweiPerEth)
v.prevBalanceLock.Lock()
for i, pubKey := range resp.PublicKeys {
v.logValidatorGainsAndLossesPerKey(slot, prevEpoch, pubKey, i, resp)
truncatedKey := fmt.Sprintf("%#x", bytesutil.Trunc(pubKey))
pubKeyBytes := bytesutil.ToBytes48(pubKey)
if slot < params.BeaconConfig().SlotsPerEpoch {
v.prevBalance[pubKeyBytes] = params.BeaconConfig().MaxEffectiveBalance
}
// Safely load data from response with slice out of bounds checks. The server should return
// the response with all slices of equal length, but the validator could panic if the server
// did not do so for whatever reason.
var balBeforeEpoch uint64
var balAfterEpoch uint64
var correctlyVotedSource bool
var correctlyVotedTarget bool
var correctlyVotedHead bool
if i < len(resp.BalancesBeforeEpochTransition) {
balBeforeEpoch = resp.BalancesBeforeEpochTransition[i]
} else {
log.WithField("pubKey", truncatedKey).Warn("Missing balance before epoch transition")
}
if i < len(resp.BalancesAfterEpochTransition) {
balAfterEpoch = resp.BalancesAfterEpochTransition[i]
} else {
}
if i < len(resp.CorrectlyVotedSource) {
correctlyVotedSource = resp.CorrectlyVotedSource[i]
} else {
log.WithField("pubKey", truncatedKey).Warn("Missing correctly voted source")
}
if i < len(resp.CorrectlyVotedTarget) {
correctlyVotedTarget = resp.CorrectlyVotedTarget[i]
} else {
log.WithField("pubKey", truncatedKey).Warn("Missing correctly voted target")
}
if i < len(resp.CorrectlyVotedHead) {
correctlyVotedHead = resp.CorrectlyVotedHead[i]
} else {
log.WithField("pubKey", truncatedKey).Warn("Missing correctly voted head")
}
if _, ok := v.startBalances[pubKeyBytes]; !ok {
v.startBalances[pubKeyBytes] = balBeforeEpoch
}
fmtKey := fmt.Sprintf("%#x", pubKey)
if v.prevBalance[pubKeyBytes] > 0 {
newBalance := float64(balAfterEpoch) / gweiPerEth
prevBalance := float64(balBeforeEpoch) / gweiPerEth
startBalance := float64(v.startBalances[pubKeyBytes]) / gweiPerEth
percentNet := (newBalance - prevBalance) / prevBalance
percentSinceStart := (newBalance - startBalance) / startBalance
previousEpochSummaryFields := logrus.Fields{
"pubKey": truncatedKey,
"epoch": prevEpoch,
"correctlyVotedSource": correctlyVotedSource,
"correctlyVotedTarget": correctlyVotedTarget,
"correctlyVotedHead": correctlyVotedHead,
"startBalance": startBalance,
"oldBalance": prevBalance,
"newBalance": newBalance,
"percentChange": fmt.Sprintf("%.5f%%", percentNet*100),
"percentChangeSinceStart": fmt.Sprintf("%.5f%%", percentSinceStart*100),
}
// These fields are deprecated after Altair.
if slots.ToEpoch(slot) < params.BeaconConfig().AltairForkEpoch {
if i < len(resp.InclusionSlots) {
previousEpochSummaryFields["inclusionSlot"] = resp.InclusionSlots[i]
} else {
log.WithField("pubKey", truncatedKey).Warn("Missing inclusion slot")
}
if i < len(resp.InclusionDistances) {
previousEpochSummaryFields["inclusionDistance"] = resp.InclusionDistances[i]
} else {
log.WithField("pubKey", truncatedKey).Warn("Missing inclusion distance")
}
}
if slots.ToEpoch(slot) >= params.BeaconConfig().AltairForkEpoch {
if i < len(resp.InactivityScores) {
previousEpochSummaryFields["inactivityScore"] = resp.InactivityScores[i]
} else {
log.WithField("pubKey", truncatedKey).Warn("Missing inactivity score")
}
}
log.WithFields(previousEpochSummaryFields).Info("Previous epoch voting summary")
if v.emitAccountMetrics {
ValidatorBalancesGaugeVec.WithLabelValues(fmtKey).Set(newBalance)
if correctlyVotedSource {
ValidatorCorrectlyVotedSourceGaugeVec.WithLabelValues(fmtKey).Set(1)
} else {
ValidatorCorrectlyVotedSourceGaugeVec.WithLabelValues(fmtKey).Set(0)
}
if correctlyVotedTarget {
ValidatorCorrectlyVotedTargetGaugeVec.WithLabelValues(fmtKey).Set(1)
} else {
ValidatorCorrectlyVotedTargetGaugeVec.WithLabelValues(fmtKey).Set(0)
}
if correctlyVotedHead {
ValidatorCorrectlyVotedHeadGaugeVec.WithLabelValues(fmtKey).Set(1)
} else {
ValidatorCorrectlyVotedHeadGaugeVec.WithLabelValues(fmtKey).Set(0)
}
// Phase0 specific metrics
if slots.ToEpoch(slot) < params.BeaconConfig().AltairForkEpoch {
if i < len(resp.InclusionDistances) {
ValidatorInclusionDistancesGaugeVec.WithLabelValues(fmtKey).Set(float64(resp.InclusionDistances[i]))
}
} else { // Altair specific metrics.
// Reset phase0 fields that no longer apply
ValidatorInclusionDistancesGaugeVec.DeleteLabelValues(fmtKey)
if i < len(resp.InactivityScores) {
ValidatorInactivityScoreGaugeVec.WithLabelValues(fmtKey).Set(float64(resp.InactivityScores[i]))
}
}
}
}
v.prevBalance[pubKeyBytes] = balBeforeEpoch
}
v.prevBalanceLock.Unlock()
@@ -268,131 +386,6 @@ func (v *validator) LogValidatorGainsAndLosses(ctx context.Context, slot types.S
return nil
}
// logValidatorGainsAndLossesPerKey requires the v.prevBalanceLock to be held prior to calling this method.
func (v *validator) logValidatorGainsAndLossesPerKey(slot types.Slot, prevEpoch types.Epoch, pubKey []byte, i int, resp *ethpb.ValidatorPerformanceResponse) {
gweiPerEth := float64(params.BeaconConfig().GweiPerEth)
truncatedKey := fmt.Sprintf("%#x", bytesutil.Trunc(pubKey))
pubKeyBytes := bytesutil.ToBytes48(pubKey)
if slot < params.BeaconConfig().SlotsPerEpoch {
v.prevBalance[pubKeyBytes] = params.BeaconConfig().MaxEffectiveBalance
}
log := log.WithField("pubKey", truncatedKey)
// Safely load data from response with slice out of bounds checks. The server should return
// the response with all slices of equal length, but the validator could panic if the server
// did not do so for whatever reason.
var balBeforeEpoch uint64
var balAfterEpoch uint64
var correctlyVotedSource bool
var correctlyVotedTarget bool
var correctlyVotedHead bool
if i < len(resp.BalancesBeforeEpochTransition) {
balBeforeEpoch = resp.BalancesBeforeEpochTransition[i]
} else {
log.Warn("Missing balance before epoch transition")
}
if i < len(resp.BalancesAfterEpochTransition) {
balAfterEpoch = resp.BalancesAfterEpochTransition[i]
}
if i < len(resp.CorrectlyVotedSource) {
correctlyVotedSource = resp.CorrectlyVotedSource[i]
} else {
log.Warn("Missing correctly voted source")
}
if i < len(resp.CorrectlyVotedTarget) {
correctlyVotedTarget = resp.CorrectlyVotedTarget[i]
} else {
log.Warn("Missing correctly voted target")
}
if i < len(resp.CorrectlyVotedHead) {
correctlyVotedHead = resp.CorrectlyVotedHead[i]
} else {
log.Warn("Missing correctly voted head")
}
if _, ok := v.startBalances[pubKeyBytes]; !ok {
v.startBalances[pubKeyBytes] = balBeforeEpoch
}
fmtKey := fmt.Sprintf("%#x", pubKey)
if v.prevBalance[pubKeyBytes] > 0 {
newBalance := float64(balAfterEpoch) / gweiPerEth
prevBalance := float64(balBeforeEpoch) / gweiPerEth
startBalance := float64(v.startBalances[pubKeyBytes]) / gweiPerEth
percentNet := (newBalance - prevBalance) / prevBalance
percentSinceStart := (newBalance - startBalance) / startBalance
previousEpochSummaryFields := logrus.Fields{
"pubKey": truncatedKey,
"epoch": prevEpoch,
"correctlyVotedSource": correctlyVotedSource,
"correctlyVotedTarget": correctlyVotedTarget,
"correctlyVotedHead": correctlyVotedHead,
"startBalance": startBalance,
"oldBalance": prevBalance,
"newBalance": newBalance,
"percentChange": fmt.Sprintf("%.5f%%", percentNet*100),
"percentChangeSinceStart": fmt.Sprintf("%.5f%%", percentSinceStart*100),
}
// These fields are deprecated after Altair.
if slots.ToEpoch(slot) < params.BeaconConfig().AltairForkEpoch {
if i < len(resp.InclusionSlots) {
previousEpochSummaryFields["inclusionSlot"] = resp.InclusionSlots[i]
} else {
log.Warn("Missing inclusion slot")
}
if i < len(resp.InclusionDistances) {
previousEpochSummaryFields["inclusionDistance"] = resp.InclusionDistances[i]
} else {
log.Warn("Missing inclusion distance")
}
}
if slots.ToEpoch(slot) >= params.BeaconConfig().AltairForkEpoch {
if i < len(resp.InactivityScores) {
previousEpochSummaryFields["inactivityScore"] = resp.InactivityScores[i]
} else {
log.Warn("Missing inactivity score")
}
}
log.WithFields(previousEpochSummaryFields).Info("Previous epoch voting summary")
if v.emitAccountMetrics {
ValidatorBalancesGaugeVec.WithLabelValues(fmtKey).Set(newBalance)
if correctlyVotedSource {
ValidatorCorrectlyVotedSourceGaugeVec.WithLabelValues(fmtKey).Set(1)
} else {
ValidatorCorrectlyVotedSourceGaugeVec.WithLabelValues(fmtKey).Set(0)
}
if correctlyVotedTarget {
ValidatorCorrectlyVotedTargetGaugeVec.WithLabelValues(fmtKey).Set(1)
} else {
ValidatorCorrectlyVotedTargetGaugeVec.WithLabelValues(fmtKey).Set(0)
}
if correctlyVotedHead {
ValidatorCorrectlyVotedHeadGaugeVec.WithLabelValues(fmtKey).Set(1)
} else {
ValidatorCorrectlyVotedHeadGaugeVec.WithLabelValues(fmtKey).Set(0)
}
// Phase0 specific metrics
if slots.ToEpoch(slot) < params.BeaconConfig().AltairForkEpoch {
if i < len(resp.InclusionDistances) {
ValidatorInclusionDistancesGaugeVec.WithLabelValues(fmtKey).Set(float64(resp.InclusionDistances[i]))
}
} else { // Altair specific metrics.
// Reset phase0 fields that no longer apply
ValidatorInclusionDistancesGaugeVec.DeleteLabelValues(fmtKey)
if i < len(resp.InactivityScores) {
ValidatorInactivityScoreGaugeVec.WithLabelValues(fmtKey).Set(float64(resp.InactivityScores[i]))
}
}
}
}
v.prevBalance[pubKeyBytes] = balBeforeEpoch
}
// UpdateLogAggregateStats updates and logs the voteStats struct of a validator using the RPC response obtained from LogValidatorGainsAndLosses.
func (v *validator) UpdateLogAggregateStats(resp *ethpb.ValidatorPerformanceResponse, slot types.Slot) {
summary := &v.voteStats