Better attesting summary reporting (#4845)

This commit is contained in:
terence tsao
2020-02-12 13:38:19 -08:00
committed by GitHub
parent 090d9627fe
commit 16fef1c658
11 changed files with 82 additions and 59 deletions

View File

@@ -1272,7 +1272,7 @@ go_repository(
go_repository(
name = "com_github_prysmaticlabs_ethereumapis",
commit = "6720aaf759152b73b1162d7c2156ecc4e788a8d3",
commit = "9e66dfce9956682649fedac730eb5aa2d5973456",
importpath = "github.com/prysmaticlabs/ethereumapis",
patch_args = ["-p1"],
patches = [

View File

@@ -24,6 +24,10 @@ import (
"go.opencensus.io/trace"
)
// ValidatorSummary tracks validator's attesting summary on per epoch basis. This
// gets updated during epoch transition.
var ValidatorSummary []*precompute.Validator
// ExecuteStateTransition defines the procedure for a state transition function.
//
// Spec pseudocode definition:
@@ -595,6 +599,8 @@ func ProcessEpochPrecompute(ctx context.Context, state *stateTrie.BeaconState) (
return nil, err
}
ValidatorSummary = vp
state, err = precompute.ProcessJustificationAndFinalizationPreCompute(state, bp)
if err != nil {
return nil, errors.Wrap(err, "could not process justification")

View File

@@ -194,7 +194,7 @@ func (bs *Server) StreamBlocks(_ *ptypes.Empty, stream ethpb.BeaconChain_StreamB
// One nil block shouldn't stop the stream.
continue
}
if err := stream.Send(data.SignedBlock.Block); err != nil {
if err := stream.Send(data.SignedBlock); err != nil {
return status.Errorf(codes.Unavailable, "Could not send over stream: %v", err)
}
}

View File

@@ -639,7 +639,7 @@ func TestServer_StreamBlocks_OnHeadUpdated(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockStream := mockRPC.NewMockBeaconChain_StreamBlocksServer(ctrl)
mockStream.EXPECT().Send(b.Block).Do(func(arg0 interface{}) {
mockStream.EXPECT().Send(b).Do(func(arg0 interface{}) {
exitRoutine <- true
})
mockStream.EXPECT().Context().Return(ctx).AnyTimes()

View File

@@ -581,51 +581,45 @@ func (bs *Server) GetValidatorQueue(
func (bs *Server) GetValidatorPerformance(
ctx context.Context, req *ethpb.ValidatorPerformanceRequest,
) (*ethpb.ValidatorPerformanceResponse, error) {
headState, err := bs.HeadFetcher.HeadState(ctx)
if err != nil {
return nil, status.Error(codes.Internal, "Could not get head state")
}
// Advance state with empty transitions up to the requested epoch start slot.
if req.Slot > headState.Slot() {
headState, err = state.ProcessSlots(ctx, headState, req.Slot)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not process slots up to %d: %v", req.Slot, err)
}
}
validatorSummary := state.ValidatorSummary
balances := make([]uint64, len(req.PublicKeys))
effectiveBalances := make([]uint64, 0)
inclusionSlots := make([]uint64, 0)
inclusionDistances := make([]uint64, 0)
correctlyVotedSource := make([]bool, 0)
correctlyVotedTarget := make([]bool, 0)
correctlyVotedHead := make([]bool, 0)
missingValidators := make([][]byte, 0)
for i, key := range req.PublicKeys {
index, ok, err := bs.BeaconDB.ValidatorIndex(ctx, key)
if err != nil || !ok {
// Convert the list of validator public keys to list of validator indices.
// Also track missing validators using public keys.
for _, key := range req.PublicKeys {
idx, ok, err := bs.BeaconDB.ValidatorIndex(ctx, key)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not fetch validator idx for public key %#x: %v", key, err)
}
if !ok {
missingValidators = append(missingValidators, key)
balances[i] = 0
continue
}
balances[i], err = headState.BalanceAtIndex(index)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not get balance at index %d", i)
}
effectiveBalances = append(effectiveBalances, validatorSummary[idx].CurrentEpochEffectiveBalance)
inclusionSlots = append(inclusionSlots, validatorSummary[idx].InclusionSlot)
inclusionDistances = append(inclusionDistances, validatorSummary[idx].InclusionDistance)
correctlyVotedSource = append(correctlyVotedSource, validatorSummary[idx].IsPrevEpochAttester)
correctlyVotedTarget = append(correctlyVotedTarget, validatorSummary[idx].IsPrevEpochTargetAttester)
correctlyVotedHead = append(correctlyVotedHead, validatorSummary[idx].IsPrevEpochHeadAttester)
}
activeCount, err := helpers.ActiveValidatorCount(headState, helpers.SlotToEpoch(req.Slot))
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not retrieve active validator count: %v", err)
}
totalActiveBalance, err := helpers.TotalActiveBalance(headState)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not retrieve total active balance: %v", err)
}
avgBalance := float32(totalActiveBalance / activeCount)
return &ethpb.ValidatorPerformanceResponse{
Balances: balances,
AverageActiveValidatorBalance: avgBalance,
MissingValidators: missingValidators,
TotalValidators: uint64(headState.NumValidators()),
TotalActiveValidators: activeCount,
InclusionSlots: inclusionSlots,
InclusionDistances: inclusionDistances,
CorrectlyVotedSource: correctlyVotedSource,
CorrectlyVotedTarget: correctlyVotedTarget,
CorrectlyVotedHead: correctlyVotedHead,
Balances: effectiveBalances,
MissingValidators: missingValidators,
}, nil
}

View File

@@ -303,7 +303,7 @@ func (mr *MockBeaconChain_StreamBlocksServerMockRecorder) RecvMsg(arg0 interface
}
// Send mocks base method
func (m *MockBeaconChain_StreamBlocksServer) Send(arg0 *v1alpha1.BeaconBlock) error {
func (m *MockBeaconChain_StreamBlocksServer) Send(arg0 *v1alpha1.SignedBeaconBlock) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Send", arg0)
ret0, _ := ret[0].(error)

View File

@@ -196,7 +196,7 @@ var (
Hidden: true,
}
deprecatedForkchoiceAggregateAttestations = cli.BoolFlag{
Name: "forkchoice-aggregate-attestations",
Name: "forkchoice-aggregate-attestations",
Usage: deprecatedUsage,
Hidden: true,
}

View File

@@ -587,10 +587,10 @@ func (mr *MockBeaconChain_StreamBlocksClientMockRecorder) Header() *gomock.Call
}
// Recv mocks base method
func (m *MockBeaconChain_StreamBlocksClient) Recv() (*v1alpha1.BeaconBlock, error) {
func (m *MockBeaconChain_StreamBlocksClient) Recv() (*v1alpha1.SignedBeaconBlock, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Recv")
ret0, _ := ret[0].(*v1alpha1.BeaconBlock)
ret0, _ := ret[0].(*v1alpha1.SignedBeaconBlock)
ret1, _ := ret[1].(error)
return ret0, ret1
}

View File

@@ -22,16 +22,13 @@ func TestService_ReceiveBlocks(t *testing.T) {
}
stream := mock.NewMockBeaconChain_StreamBlocksClient(ctrl)
ctx, cancel := context.WithCancel(context.Background())
block := &ethpb.BeaconBlock{
Slot: 5,
}
client.EXPECT().StreamBlocks(
gomock.Any(),
&ptypes.Empty{},
).Return(stream, nil)
stream.EXPECT().Context().Return(ctx).AnyTimes()
stream.EXPECT().Recv().Return(
block,
&ethpb.SignedBeaconBlock{},
nil,
).Do(func() {
cancel()

View File

@@ -262,7 +262,7 @@ index 2ce5c34..4cbb276 100644
+ bytes signature = 3 [(gogoproto.moretags) = "ssz-size:\"96\""];
}
diff --git a/eth/v1alpha1/beacon_chain.proto b/eth/v1alpha1/beacon_chain.proto
index 8de1adb..ffcc8f4 100644
index 731fa45..3a9092c 100644
--- a/eth/v1alpha1/beacon_chain.proto
+++ b/eth/v1alpha1/beacon_chain.proto
@@ -15,6 +15,7 @@ syntax = "proto3";
@@ -369,7 +369,7 @@ index 8de1adb..ffcc8f4 100644
// Indices of validators ejected in the given epoch.
repeated uint64 ejected_indices = 9;
@@ -583,11 +583,11 @@ message ValidatorQueue {
@@ -591,11 +591,11 @@ message ValidatorQueue {
// Ordered list of 48 byte public keys awaiting activation. 0th index is the
// next key to be processed.
@@ -383,7 +383,7 @@ index 8de1adb..ffcc8f4 100644
}
message ListValidatorAssignmentsRequest {
@@ -599,7 +599,7 @@ message ListValidatorAssignmentsRequest {
@@ -607,7 +607,7 @@ message ListValidatorAssignmentsRequest {
bool genesis = 2;
}
// 48 byte validator public keys to filter assignments for the given epoch.
@@ -392,7 +392,7 @@ index 8de1adb..ffcc8f4 100644
// Validator indicies to filter assignments for the given epoch.
repeated uint64 indices = 4;
@@ -634,7 +634,7 @@ message ValidatorAssignments {
@@ -642,7 +642,7 @@ message ValidatorAssignments {
uint64 proposer_slot = 4;
// 48 byte BLS public key.

View File

@@ -45,7 +45,6 @@ func (v *validator) LogValidatorGainsAndLosses(ctx context.Context, slot uint64)
pubKeys := bytesutil.FromBytes48Array(pks)
req := &ethpb.ValidatorPerformanceRequest{
Slot: slot,
PublicKeys: pubKeys,
}
resp, err := v.beaconClient.GetValidatorPerformance(ctx, req)
@@ -57,6 +56,12 @@ func (v *validator) LogValidatorGainsAndLosses(ctx context.Context, slot uint64)
for _, val := range resp.MissingValidators {
missingValidators[bytesutil.ToBytes48(val)] = true
}
included := 0
votedSource := 0
votedTarget := 0
votedHead := 0
for i, pkey := range pubKeys {
pubKey := fmt.Sprintf("%#x", pkey[:8])
log := log.WithField("pubKey", pubKey)
@@ -73,20 +78,41 @@ func (v *validator) LogValidatorGainsAndLosses(ctx context.Context, slot uint64)
newBalance := float64(resp.Balances[i]) / float64(params.BeaconConfig().GweiPerEth)
if v.prevBalance[bytesutil.ToBytes48(pkey)] > 0 {
prevBalance := float64(v.prevBalance[bytesutil.ToBytes48(pkey)]) / float64(params.BeaconConfig().GweiPerEth)
percentNet := (newBalance - prevBalance) / prevBalance
log.WithFields(logrus.Fields{
"epoch": (slot / params.BeaconConfig().SlotsPerEpoch) - 1,
"prevBalance": prevBalance,
"newBalance": newBalance,
"percentChange": fmt.Sprintf("%.5f%%", percentNet*100),
}).Info("New Balance")
"epoch": (slot / params.BeaconConfig().SlotsPerEpoch) - 1,
"correctlyVotedSource": resp.CorrectlyVotedSource[i],
"correctlyVotedTarget": resp.CorrectlyVotedTarget[i],
"correctlyVotedHead": resp.CorrectlyVotedHead[i],
"inclusionSlot": resp.InclusionSlots[i],
"inclusionDistance": resp.InclusionDistances[i],
}).Info("Previous epoch voting summary")
if v.emitAccountMetrics {
validatorBalancesGaugeVec.WithLabelValues(pubKey).Set(newBalance)
}
}
if resp.InclusionSlots[i] != ^uint64(0) {
included++
}
if resp.CorrectlyVotedSource[i] {
votedSource++
}
if resp.CorrectlyVotedTarget[i] {
votedTarget++
}
if resp.CorrectlyVotedHead[i] {
votedHead++
}
v.prevBalance[bytesutil.ToBytes48(pkey)] = resp.Balances[i]
}
log.WithFields(logrus.Fields{
"epoch": (slot / params.BeaconConfig().SlotsPerEpoch) - 1,
"attestationInclusionPercentage": fmt.Sprintf("%.2f", float64(included)/float64(len(resp.InclusionSlots))),
"correctlyVotedSourcePercentage": fmt.Sprintf("%.2f", float64(votedSource)/float64(len(resp.CorrectlyVotedSource))),
"correctlyVotedTargetPercentage": fmt.Sprintf("%.2f", float64(votedTarget)/float64(len(resp.CorrectlyVotedTarget))),
"correctlyVotedHeadPercentage": fmt.Sprintf("%.2f", float64(votedHead)/float64(len(resp.CorrectlyVotedHead))),
}).Info("Previous epoch aggregated voting summary")
return nil
}