Compare commits

...

7 Commits

Author SHA1 Message Date
prestonvanloon
808ee6b039 Merge branch 'develop' of github.com:prysmaticlabs/prysm into complexity-reduction 2022-05-20 07:21:34 -05:00
prestonvanloon
3f6dc6ddf8 Merge branch 'develop' of github.com:prysmaticlabs/prysm into complexity-reduction 2022-05-09 14:56:00 -05:00
prestonvanloon
8b9b9a4700 refactor block streaming, add bellatrix 2022-04-22 14:46:43 -05:00
prestonvanloon
310b5325ef Refactor some metrics logging to be slightly less complex 2022-04-22 14:27:26 -05:00
Preston Van Loon
e8ee1093ae Lower threshold to 90 2022-04-21 13:49:48 +00:00
Preston Van Loon
5c404e823b Merge branch 'develop' into complexity-reduction 2022-04-18 09:34:44 -05:00
prestonvanloon
d55846c050 Lower complexity threshold to 100, fix a few complexity issues 2022-04-18 09:22:28 -05:00
6 changed files with 316 additions and 302 deletions

4
.github/CODEOWNERS vendored
View File

@@ -14,3 +14,7 @@ deps.bzl @prysmaticlabs/core-team
/beacon-chain/state/v2/ @rkapka @nisdas
/beacon-chain/state/v3/ @rkapka @nisdas
/beacon-chain/state/state-native/ @rkapka @nisdas
# Setting new static analysis exclusions generally should not occur.
# Any additions of exclusions should be approved by Preston and/or Nishant.
nogo_config.json @prestonvanloon @nisdas

View File

@@ -128,13 +128,13 @@ nogo(
"//tools/analyzers/recursivelock:go_default_library",
"//tools/analyzers/shadowpredecl:go_default_library",
"//tools/analyzers/slicedirect:go_default_library",
"//tools/analyzers/uintcast:go_default_library",
] + select({
# nogo checks that fail with coverage enabled.
":coverage_enabled": [],
"//conditions:default": [
"@org_golang_x_tools//go/analysis/passes/lostcancel:go_default_library",
"@org_golang_x_tools//go/analysis/passes/composite:go_default_library",
"//tools/analyzers/uintcast:go_default_library",
],
}),
)

View File

@@ -27,81 +27,12 @@ func (vs *Server) StreamBlocksAltair(req *ethpb.StreamBlocksRequest, stream ethp
select {
case blockEvent := <-blocksChannel:
if req.VerifiedOnly {
if blockEvent.Type == statefeed.BlockProcessed {
data, ok := blockEvent.Data.(*statefeed.BlockProcessedData)
if !ok || data == nil {
continue
}
b := &ethpb.StreamBlocksResponse{}
switch data.SignedBlock.Version() {
case version.Phase0:
phBlk, ok := data.SignedBlock.Proto().(*ethpb.SignedBeaconBlock)
if !ok {
log.Warn("Mismatch between version and block type, was expecting SignedBeaconBlock")
continue
}
b.Block = &ethpb.StreamBlocksResponse_Phase0Block{Phase0Block: phBlk}
case version.Altair:
phBlk, ok := data.SignedBlock.Proto().(*ethpb.SignedBeaconBlockAltair)
if !ok {
log.Warn("Mismatch between version and block type, was expecting SignedBeaconBlockAltair")
continue
}
b.Block = &ethpb.StreamBlocksResponse_AltairBlock{AltairBlock: phBlk}
case version.Bellatrix:
phBlk, ok := data.SignedBlock.Proto().(*ethpb.SignedBeaconBlockBellatrix)
if !ok {
log.Warn("Mismatch between version and block type, was expecting SignedBeaconBlockBellatrix")
continue
}
b.Block = &ethpb.StreamBlocksResponse_BellatrixBlock{BellatrixBlock: phBlk}
}
if err := stream.Send(b); err != nil {
return status.Errorf(codes.Unavailable, "Could not send over stream: %v", err)
}
if err := sendVerifiedBlocks(stream, blockEvent); err != nil {
return err
}
} else {
if blockEvent.Type == blockfeed.ReceivedBlock {
data, ok := blockEvent.Data.(*blockfeed.ReceivedBlockData)
if !ok {
// Got bad data over the stream.
continue
}
if data.SignedBlock == nil {
// One nil block shouldn't stop the stream.
continue
}
headState, err := vs.HeadFetcher.HeadState(vs.Ctx)
if err != nil {
log.WithError(err).WithField("blockSlot", data.SignedBlock.Block().Slot()).Error("Could not get head state")
continue
}
signed := data.SignedBlock
if err := blocks.VerifyBlockSignature(headState, signed.Block().ProposerIndex(), signed.Signature(), signed.Block().HashTreeRoot); err != nil {
log.WithError(err).WithField("blockSlot", data.SignedBlock.Block().Slot()).Error("Could not verify block signature")
continue
}
b := &ethpb.StreamBlocksResponse{}
switch data.SignedBlock.Version() {
case version.Phase0:
phBlk, ok := data.SignedBlock.Proto().(*ethpb.SignedBeaconBlock)
if !ok {
log.Warn("Mismatch between version and block type, was expecting *ethpb.SignedBeaconBlock")
continue
}
b.Block = &ethpb.StreamBlocksResponse_Phase0Block{Phase0Block: phBlk}
case version.Altair:
phBlk, ok := data.SignedBlock.Proto().(*ethpb.SignedBeaconBlockAltair)
if !ok {
log.Warn("Mismatch between version and block type, was expecting *v2.SignedBeaconBlockAltair")
continue
}
b.Block = &ethpb.StreamBlocksResponse_AltairBlock{AltairBlock: phBlk}
}
if err := stream.Send(b); err != nil {
return status.Errorf(codes.Unavailable, "Could not send over stream: %v", err)
}
if err := vs.sendBlocks(stream, blockEvent); err != nil {
return err
}
}
case <-blockSub.Err():
@@ -113,3 +44,86 @@ func (vs *Server) StreamBlocksAltair(req *ethpb.StreamBlocksRequest, stream ethp
}
}
}
func sendVerifiedBlocks(stream ethpb.BeaconNodeValidator_StreamBlocksAltairServer, blockEvent *feed.Event) error {
if blockEvent.Type != statefeed.BlockProcessed {
return nil
}
data, ok := blockEvent.Data.(*statefeed.BlockProcessedData)
if !ok || data == nil {
return nil
}
b := &ethpb.StreamBlocksResponse{}
switch data.SignedBlock.Version() {
case version.Phase0:
phBlk, ok := data.SignedBlock.Proto().(*ethpb.SignedBeaconBlock)
if !ok {
log.Warn("Mismatch between version and block type, was expecting SignedBeaconBlock")
return nil
}
b.Block = &ethpb.StreamBlocksResponse_Phase0Block{Phase0Block: phBlk}
case version.Altair:
phBlk, ok := data.SignedBlock.Proto().(*ethpb.SignedBeaconBlockAltair)
if !ok {
log.Warn("Mismatch between version and block type, was expecting SignedBeaconBlockAltair")
return nil
}
b.Block = &ethpb.StreamBlocksResponse_AltairBlock{AltairBlock: phBlk}
case version.Bellatrix:
phBlk, ok := data.SignedBlock.Proto().(*ethpb.SignedBeaconBlockBellatrix)
if !ok {
log.Warn("Mismatch between version and block type, was expecting SignedBeaconBlockBellatrix")
return nil
}
b.Block = &ethpb.StreamBlocksResponse_BellatrixBlock{BellatrixBlock: phBlk}
}
if err := stream.Send(b); err != nil {
return status.Errorf(codes.Unavailable, "Could not send over stream: %v", err)
}
return nil
}
func (vs *Server) sendBlocks(stream ethpb.BeaconNodeValidator_StreamBlocksAltairServer, blockEvent *feed.Event) error {
if blockEvent.Type != blockfeed.ReceivedBlock {
return nil
}
data, ok := blockEvent.Data.(*blockfeed.ReceivedBlockData)
if !ok {
// Got bad data over the stream.
return nil
}
if data.SignedBlock == nil {
// One nil block shouldn't stop the stream.
return nil
}
log := log.WithField("blockSlot", data.SignedBlock.Block().Slot())
headState, err := vs.HeadFetcher.HeadState(vs.Ctx)
if err != nil {
log.WithError(err).Error("Could not get head state")
return nil
}
signed := data.SignedBlock
if err := blocks.VerifyBlockSignature(headState, signed.Block().ProposerIndex(), signed.Signature(), signed.Block().HashTreeRoot); err != nil {
log.WithError(err).Error("Could not verify block signature")
return nil
}
b := &ethpb.StreamBlocksResponse{}
switch p := data.SignedBlock.Proto().(type) {
case *ethpb.SignedBeaconBlock:
b.Block = &ethpb.StreamBlocksResponse_Phase0Block{Phase0Block: p}
case *ethpb.SignedBeaconBlockAltair:
b.Block = &ethpb.StreamBlocksResponse_AltairBlock{AltairBlock: p}
case *ethpb.SignedBeaconBlockBellatrix:
b.Block = &ethpb.StreamBlocksResponse_BellatrixBlock{BellatrixBlock: p}
default:
log.Errorf("Unknown block type %T", p)
}
if err := stream.Send(b); err != nil {
return status.Errorf(codes.Unavailable, "Could not send over stream: %v", err)
}
return nil
}

View File

@@ -36,36 +36,9 @@ func deepValueEqual(v1, v2 reflect.Value, visited map[visit]bool, depth int) boo
if v1.Type() != v2.Type() {
return false
}
// We want to avoid putting more in the visited map than we need to.
// For any possible reference cycle that might be encountered,
// hard(t) needs to return true for at least one of the types in the cycle.
hard := func(k reflect.Kind) bool {
switch k {
case reflect.Slice, reflect.Ptr, reflect.Interface:
return true
}
return false
}
if v1.CanAddr() && v2.CanAddr() && hard(v1.Kind()) {
addr1 := unsafe.Pointer(v1.UnsafeAddr()) // #nosec G103 -- Test compare only
addr2 := unsafe.Pointer(v2.UnsafeAddr()) // #nosec G103 -- Test compare only
if uintptr(addr1) > uintptr(addr2) {
// Canonicalize order to reduce number of entries in visited.
// Assumes non-moving garbage collector.
addr1, addr2 = addr2, addr1
}
// Short circuit if references are already seen.
typ := v1.Type()
v := visit{addr1, addr2, typ}
if visited[v] {
return true
}
// Remember for later.
visited[v] = true
if compareAddr(v1, v2, visited) {
return true
}
switch v1.Kind() {
@@ -77,27 +50,7 @@ func deepValueEqual(v1, v2 reflect.Value, visited map[visit]bool, depth int) boo
}
return true
case reflect.Slice:
if v1.IsNil() && v2.Len() == 0 {
return true
}
if v1.Len() == 0 && v2.IsNil() {
return true
}
if v1.IsNil() && v2.IsNil() {
return true
}
if v1.Len() != v2.Len() {
return false
}
if v1.Pointer() == v2.Pointer() {
return true
}
for i := 0; i < v1.Len(); i++ {
if !deepValueEqual(v1.Index(i), v2.Index(i), visited, depth+1) {
return false
}
}
return true
return compareSlice(v1, v2, visited, depth)
case reflect.Interface:
if v1.IsNil() || v2.IsNil() {
return v1.IsNil() == v2.IsNil()
@@ -120,13 +73,7 @@ func deepValueEqual(v1, v2 reflect.Value, visited map[visit]bool, depth int) boo
}
}
func deepValueEqualExportedOnly(v1, v2 reflect.Value, visited map[visit]bool, depth int) bool {
if !v1.IsValid() || !v2.IsValid() {
return v1.IsValid() == v2.IsValid()
}
if v1.Type() != v2.Type() {
return false
}
func compareAddr(v1, v2 reflect.Value, visited map[visit]bool) bool {
// We want to avoid putting more in the visited map than we need to.
// For any possible reference cycle that might be encountered,
// hard(t) needs to return true for at least one of the types in the cycle.
@@ -141,6 +88,7 @@ func deepValueEqualExportedOnly(v1, v2 reflect.Value, visited map[visit]bool, de
if v1.CanAddr() && v2.CanAddr() && hard(v1.Kind()) {
addr1 := unsafe.Pointer(v1.UnsafeAddr()) // #nosec G103 -- Test compare only
addr2 := unsafe.Pointer(v2.UnsafeAddr()) // #nosec G103 -- Test compare only
if uintptr(addr1) > uintptr(addr2) {
// Canonicalize order to reduce number of entries in visited.
// Assumes non-moving garbage collector.
@@ -157,6 +105,20 @@ func deepValueEqualExportedOnly(v1, v2 reflect.Value, visited map[visit]bool, de
// Remember for later.
visited[v] = true
}
return false
}
func deepValueEqualExportedOnly(v1, v2 reflect.Value, visited map[visit]bool, depth int) bool {
if !v1.IsValid() || !v2.IsValid() {
return v1.IsValid() == v2.IsValid()
}
if v1.Type() != v2.Type() {
return false
}
if compareAddr(v1, v2, visited) {
return true
}
switch v1.Kind() {
case reflect.Array:
@@ -216,6 +178,30 @@ func deepValueEqualExportedOnly(v1, v2 reflect.Value, visited map[visit]bool, de
}
}
func compareSlice(v1, v2 reflect.Value, visited map[visit]bool, depth int) bool {
if v1.IsNil() && v2.Len() == 0 {
return true
}
if v1.Len() == 0 && v2.IsNil() {
return true
}
if v1.IsNil() && v2.IsNil() {
return true
}
if v1.Len() != v2.Len() {
return false
}
if v1.Pointer() == v2.Pointer() {
return true
}
for i := 0; i < v1.Len(); i++ {
if !deepValueEqual(v1.Index(i), v2.Index(i), visited, depth+1) {
return false
}
}
return true
}
func deepValueBaseTypeEqual(v1, v2 reflect.Value) bool {
switch v1.Kind() {
case reflect.String:

View File

@@ -93,57 +93,7 @@ func run(pass *analysis.Pass) (interface{}, error) {
reportUnhandledError(pass, stmt.Call.Lparen, stmt.Call)
}
case *ast.AssignStmt:
if len(stmt.Rhs) == 1 {
// single value on rhs; check against lhs identifiers
if call, ok := stmt.Rhs[0].(*ast.CallExpr); ok {
if ignoreCall(pass, call) {
break
}
isError := errorsByArg(pass, call)
for i := 0; i < len(stmt.Lhs); i++ {
if id, ok := stmt.Lhs[i].(*ast.Ident); ok {
// We shortcut calls to recover() because errorsByArg can't
// check its return types for errors since it returns interface{}.
if id.Name == "_" && (isRecover(pass, call) || isError[i]) {
reportUnhandledError(pass, id.NamePos, call)
}
}
}
} else if assert, ok := stmt.Rhs[0].(*ast.TypeAssertExpr); ok {
if assert.Type == nil {
// type switch
break
}
if len(stmt.Lhs) < 2 {
// assertion result not read
reportUnhandledTypeAssertion(pass, stmt.Rhs[0].Pos())
} else if id, ok := stmt.Lhs[1].(*ast.Ident); ok && id.Name == "_" {
// assertion result ignored
reportUnhandledTypeAssertion(pass, id.NamePos)
}
}
} else {
// multiple value on rhs; in this case a call can't return
// multiple values. Assume len(stmt.Lhs) == len(stmt.Rhs)
for i := 0; i < len(stmt.Lhs); i++ {
if id, ok := stmt.Lhs[i].(*ast.Ident); ok {
if call, ok := stmt.Rhs[i].(*ast.CallExpr); ok {
if ignoreCall(pass, call) {
continue
}
if id.Name == "_" && callReturnsError(pass, call) {
reportUnhandledError(pass, id.NamePos, call)
}
} else if assert, ok := stmt.Rhs[i].(*ast.TypeAssertExpr); ok {
if assert.Type == nil {
// Shouldn't happen anyway, no multi assignment in type switches
continue
}
reportUnhandledError(pass, id.NamePos, nil)
}
}
}
}
inspectAssignStatement(pass, stmt)
default:
}
})
@@ -151,6 +101,59 @@ func run(pass *analysis.Pass) (interface{}, error) {
return nil, nil
}
func inspectAssignStatement(pass *analysis.Pass, stmt *ast.AssignStmt) {
if len(stmt.Rhs) == 1 {
// single value on rhs; check against lhs identifiers
if call, ok := stmt.Rhs[0].(*ast.CallExpr); ok {
if ignoreCall(pass, call) {
return
}
isError := errorsByArg(pass, call)
for i := 0; i < len(stmt.Lhs); i++ {
if id, ok := stmt.Lhs[i].(*ast.Ident); ok {
// We shortcut calls to recover() because errorsByArg can't
// check its return types for errors since it returns interface{}.
if id.Name == "_" && (isRecover(pass, call) || isError[i]) {
reportUnhandledError(pass, id.NamePos, call)
}
}
}
} else if assert, ok := stmt.Rhs[0].(*ast.TypeAssertExpr); ok {
if assert.Type == nil {
return
}
if len(stmt.Lhs) < 2 {
// assertion result not read
reportUnhandledTypeAssertion(pass, stmt.Rhs[0].Pos())
} else if id, ok := stmt.Lhs[1].(*ast.Ident); ok && id.Name == "_" {
// assertion result ignored
reportUnhandledTypeAssertion(pass, id.NamePos)
}
}
} else {
// multiple value on rhs; in this case a call can't return
// multiple values. Assume len(stmt.Lhs) == len(stmt.Rhs)
for i := 0; i < len(stmt.Lhs); i++ {
if id, ok := stmt.Lhs[i].(*ast.Ident); ok {
if call, ok := stmt.Rhs[i].(*ast.CallExpr); ok {
if ignoreCall(pass, call) {
continue
}
if id.Name == "_" && callReturnsError(pass, call) {
reportUnhandledError(pass, id.NamePos, call)
}
} else if assert, ok := stmt.Rhs[i].(*ast.TypeAssertExpr); ok {
if assert.Type == nil {
// Shouldn't happen anyway, no multi assignment in type switches
continue
}
reportUnhandledError(pass, id.NamePos, nil)
}
}
}
}
}
func reportUnhandledError(pass *analysis.Pass, pos token.Pos, call *ast.CallExpr) {
pass.Reportf(pos, "Unhandled error for function call %s", fullName(pass, call))
}

View File

@@ -257,128 +257,10 @@ func (v *validator) LogValidatorGainsAndLosses(ctx context.Context, slot types.S
v.voteStats.startEpoch = prevEpoch
}
}
gweiPerEth := float64(params.BeaconConfig().GweiPerEth)
v.prevBalanceLock.Lock()
for i, pubKey := range resp.PublicKeys {
truncatedKey := fmt.Sprintf("%#x", bytesutil.Trunc(pubKey))
pubKeyBytes := bytesutil.ToBytes48(pubKey)
if slot < params.BeaconConfig().SlotsPerEpoch {
v.prevBalance[pubKeyBytes] = params.BeaconConfig().MaxEffectiveBalance
}
// Safely load data from response with slice out of bounds checks. The server should return
// the response with all slices of equal length, but the validator could panic if the server
// did not do so for whatever reason.
var balBeforeEpoch uint64
var balAfterEpoch uint64
var correctlyVotedSource bool
var correctlyVotedTarget bool
var correctlyVotedHead bool
if i < len(resp.BalancesBeforeEpochTransition) {
balBeforeEpoch = resp.BalancesBeforeEpochTransition[i]
} else {
log.WithField("pubKey", truncatedKey).Warn("Missing balance before epoch transition")
}
if i < len(resp.BalancesAfterEpochTransition) {
balAfterEpoch = resp.BalancesAfterEpochTransition[i]
} else {
}
if i < len(resp.CorrectlyVotedSource) {
correctlyVotedSource = resp.CorrectlyVotedSource[i]
} else {
log.WithField("pubKey", truncatedKey).Warn("Missing correctly voted source")
}
if i < len(resp.CorrectlyVotedTarget) {
correctlyVotedTarget = resp.CorrectlyVotedTarget[i]
} else {
log.WithField("pubKey", truncatedKey).Warn("Missing correctly voted target")
}
if i < len(resp.CorrectlyVotedHead) {
correctlyVotedHead = resp.CorrectlyVotedHead[i]
} else {
log.WithField("pubKey", truncatedKey).Warn("Missing correctly voted head")
}
if _, ok := v.startBalances[pubKeyBytes]; !ok {
v.startBalances[pubKeyBytes] = balBeforeEpoch
}
fmtKey := fmt.Sprintf("%#x", pubKey)
if v.prevBalance[pubKeyBytes] > 0 {
newBalance := float64(balAfterEpoch) / gweiPerEth
prevBalance := float64(balBeforeEpoch) / gweiPerEth
startBalance := float64(v.startBalances[pubKeyBytes]) / gweiPerEth
percentNet := (newBalance - prevBalance) / prevBalance
percentSinceStart := (newBalance - startBalance) / startBalance
previousEpochSummaryFields := logrus.Fields{
"pubKey": truncatedKey,
"epoch": prevEpoch,
"correctlyVotedSource": correctlyVotedSource,
"correctlyVotedTarget": correctlyVotedTarget,
"correctlyVotedHead": correctlyVotedHead,
"startBalance": startBalance,
"oldBalance": prevBalance,
"newBalance": newBalance,
"percentChange": fmt.Sprintf("%.5f%%", percentNet*100),
"percentChangeSinceStart": fmt.Sprintf("%.5f%%", percentSinceStart*100),
}
// These fields are deprecated after Altair.
if slots.ToEpoch(slot) < params.BeaconConfig().AltairForkEpoch {
if i < len(resp.InclusionSlots) {
previousEpochSummaryFields["inclusionSlot"] = resp.InclusionSlots[i]
} else {
log.WithField("pubKey", truncatedKey).Warn("Missing inclusion slot")
}
if i < len(resp.InclusionDistances) {
previousEpochSummaryFields["inclusionDistance"] = resp.InclusionDistances[i]
} else {
log.WithField("pubKey", truncatedKey).Warn("Missing inclusion distance")
}
}
if slots.ToEpoch(slot) >= params.BeaconConfig().AltairForkEpoch {
if i < len(resp.InactivityScores) {
previousEpochSummaryFields["inactivityScore"] = resp.InactivityScores[i]
} else {
log.WithField("pubKey", truncatedKey).Warn("Missing inactivity score")
}
}
log.WithFields(previousEpochSummaryFields).Info("Previous epoch voting summary")
if v.emitAccountMetrics {
ValidatorBalancesGaugeVec.WithLabelValues(fmtKey).Set(newBalance)
if correctlyVotedSource {
ValidatorCorrectlyVotedSourceGaugeVec.WithLabelValues(fmtKey).Set(1)
} else {
ValidatorCorrectlyVotedSourceGaugeVec.WithLabelValues(fmtKey).Set(0)
}
if correctlyVotedTarget {
ValidatorCorrectlyVotedTargetGaugeVec.WithLabelValues(fmtKey).Set(1)
} else {
ValidatorCorrectlyVotedTargetGaugeVec.WithLabelValues(fmtKey).Set(0)
}
if correctlyVotedHead {
ValidatorCorrectlyVotedHeadGaugeVec.WithLabelValues(fmtKey).Set(1)
} else {
ValidatorCorrectlyVotedHeadGaugeVec.WithLabelValues(fmtKey).Set(0)
}
// Phase0 specific metrics
if slots.ToEpoch(slot) < params.BeaconConfig().AltairForkEpoch {
if i < len(resp.InclusionDistances) {
ValidatorInclusionDistancesGaugeVec.WithLabelValues(fmtKey).Set(float64(resp.InclusionDistances[i]))
}
} else { // Altair specific metrics.
// Reset phase0 fields that no longer apply
ValidatorInclusionDistancesGaugeVec.DeleteLabelValues(fmtKey)
if i < len(resp.InactivityScores) {
ValidatorInactivityScoreGaugeVec.WithLabelValues(fmtKey).Set(float64(resp.InactivityScores[i]))
}
}
}
}
v.prevBalance[pubKeyBytes] = balBeforeEpoch
v.logValidatorGainsAndLossesPerKey(slot, prevEpoch, pubKey, i, resp)
}
v.prevBalanceLock.Unlock()
@@ -386,6 +268,131 @@ func (v *validator) LogValidatorGainsAndLosses(ctx context.Context, slot types.S
return nil
}
// logValidatorGainsAndLossesPerKey requires the v.prevBalanceLock to be held prior to calling this method.
func (v *validator) logValidatorGainsAndLossesPerKey(slot types.Slot, prevEpoch types.Epoch, pubKey []byte, i int, resp *ethpb.ValidatorPerformanceResponse) {
gweiPerEth := float64(params.BeaconConfig().GweiPerEth)
truncatedKey := fmt.Sprintf("%#x", bytesutil.Trunc(pubKey))
pubKeyBytes := bytesutil.ToBytes48(pubKey)
if slot < params.BeaconConfig().SlotsPerEpoch {
v.prevBalance[pubKeyBytes] = params.BeaconConfig().MaxEffectiveBalance
}
log := log.WithField("pubKey", truncatedKey)
// Safely load data from response with slice out of bounds checks. The server should return
// the response with all slices of equal length, but the validator could panic if the server
// did not do so for whatever reason.
var balBeforeEpoch uint64
var balAfterEpoch uint64
var correctlyVotedSource bool
var correctlyVotedTarget bool
var correctlyVotedHead bool
if i < len(resp.BalancesBeforeEpochTransition) {
balBeforeEpoch = resp.BalancesBeforeEpochTransition[i]
} else {
log.Warn("Missing balance before epoch transition")
}
if i < len(resp.BalancesAfterEpochTransition) {
balAfterEpoch = resp.BalancesAfterEpochTransition[i]
}
if i < len(resp.CorrectlyVotedSource) {
correctlyVotedSource = resp.CorrectlyVotedSource[i]
} else {
log.Warn("Missing correctly voted source")
}
if i < len(resp.CorrectlyVotedTarget) {
correctlyVotedTarget = resp.CorrectlyVotedTarget[i]
} else {
log.Warn("Missing correctly voted target")
}
if i < len(resp.CorrectlyVotedHead) {
correctlyVotedHead = resp.CorrectlyVotedHead[i]
} else {
log.Warn("Missing correctly voted head")
}
if _, ok := v.startBalances[pubKeyBytes]; !ok {
v.startBalances[pubKeyBytes] = balBeforeEpoch
}
fmtKey := fmt.Sprintf("%#x", pubKey)
if v.prevBalance[pubKeyBytes] > 0 {
newBalance := float64(balAfterEpoch) / gweiPerEth
prevBalance := float64(balBeforeEpoch) / gweiPerEth
startBalance := float64(v.startBalances[pubKeyBytes]) / gweiPerEth
percentNet := (newBalance - prevBalance) / prevBalance
percentSinceStart := (newBalance - startBalance) / startBalance
previousEpochSummaryFields := logrus.Fields{
"pubKey": truncatedKey,
"epoch": prevEpoch,
"correctlyVotedSource": correctlyVotedSource,
"correctlyVotedTarget": correctlyVotedTarget,
"correctlyVotedHead": correctlyVotedHead,
"startBalance": startBalance,
"oldBalance": prevBalance,
"newBalance": newBalance,
"percentChange": fmt.Sprintf("%.5f%%", percentNet*100),
"percentChangeSinceStart": fmt.Sprintf("%.5f%%", percentSinceStart*100),
}
// These fields are deprecated after Altair.
if slots.ToEpoch(slot) < params.BeaconConfig().AltairForkEpoch {
if i < len(resp.InclusionSlots) {
previousEpochSummaryFields["inclusionSlot"] = resp.InclusionSlots[i]
} else {
log.Warn("Missing inclusion slot")
}
if i < len(resp.InclusionDistances) {
previousEpochSummaryFields["inclusionDistance"] = resp.InclusionDistances[i]
} else {
log.Warn("Missing inclusion distance")
}
}
if slots.ToEpoch(slot) >= params.BeaconConfig().AltairForkEpoch {
if i < len(resp.InactivityScores) {
previousEpochSummaryFields["inactivityScore"] = resp.InactivityScores[i]
} else {
log.Warn("Missing inactivity score")
}
}
log.WithFields(previousEpochSummaryFields).Info("Previous epoch voting summary")
if v.emitAccountMetrics {
ValidatorBalancesGaugeVec.WithLabelValues(fmtKey).Set(newBalance)
if correctlyVotedSource {
ValidatorCorrectlyVotedSourceGaugeVec.WithLabelValues(fmtKey).Set(1)
} else {
ValidatorCorrectlyVotedSourceGaugeVec.WithLabelValues(fmtKey).Set(0)
}
if correctlyVotedTarget {
ValidatorCorrectlyVotedTargetGaugeVec.WithLabelValues(fmtKey).Set(1)
} else {
ValidatorCorrectlyVotedTargetGaugeVec.WithLabelValues(fmtKey).Set(0)
}
if correctlyVotedHead {
ValidatorCorrectlyVotedHeadGaugeVec.WithLabelValues(fmtKey).Set(1)
} else {
ValidatorCorrectlyVotedHeadGaugeVec.WithLabelValues(fmtKey).Set(0)
}
// Phase0 specific metrics
if slots.ToEpoch(slot) < params.BeaconConfig().AltairForkEpoch {
if i < len(resp.InclusionDistances) {
ValidatorInclusionDistancesGaugeVec.WithLabelValues(fmtKey).Set(float64(resp.InclusionDistances[i]))
}
} else { // Altair specific metrics.
// Reset phase0 fields that no longer apply
ValidatorInclusionDistancesGaugeVec.DeleteLabelValues(fmtKey)
if i < len(resp.InactivityScores) {
ValidatorInactivityScoreGaugeVec.WithLabelValues(fmtKey).Set(float64(resp.InactivityScores[i]))
}
}
}
}
v.prevBalance[pubKeyBytes] = balBeforeEpoch
}
// UpdateLogAggregateStats updates and logs the voteStats struct of a validator using the RPC response obtained from LogValidatorGainsAndLosses.
func (v *validator) UpdateLogAggregateStats(resp *ethpb.ValidatorPerformanceResponse, slot types.Slot) {
summary := &v.voteStats