mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-10 07:58:22 -05:00
Monitor attestations (#9901)
Log attestation performance on the validator monitor
This commit is contained in:
@@ -4,6 +4,7 @@ go_library(
|
||||
name = "go_default_library",
|
||||
srcs = [
|
||||
"doc.go",
|
||||
"process_attestation.go",
|
||||
"process_block.go",
|
||||
"process_exit.go",
|
||||
"service.go",
|
||||
@@ -11,10 +12,18 @@ go_library(
|
||||
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/monitor",
|
||||
visibility = ["//beacon-chain:__subpackages__"],
|
||||
deps = [
|
||||
"//beacon-chain/core/altair:go_default_library",
|
||||
"//beacon-chain/core/blocks:go_default_library",
|
||||
"//beacon-chain/core/helpers:go_default_library",
|
||||
"//beacon-chain/state:go_default_library",
|
||||
"//beacon-chain/state/stategen:go_default_library",
|
||||
"//config/params:go_default_library",
|
||||
"//encoding/bytesutil:go_default_library",
|
||||
"//proto/prysm/v1alpha1:go_default_library",
|
||||
"//proto/prysm/v1alpha1/attestation:go_default_library",
|
||||
"//proto/prysm/v1alpha1/block:go_default_library",
|
||||
"//runtime/version:go_default_library",
|
||||
"//time/slots:go_default_library",
|
||||
"@com_github_prysmaticlabs_eth2_types//:go_default_library",
|
||||
"@com_github_sirupsen_logrus//:go_default_library",
|
||||
],
|
||||
@@ -23,18 +32,24 @@ go_library(
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = [
|
||||
"process_attestation_test.go",
|
||||
"process_block_test.go",
|
||||
"process_exit_test.go",
|
||||
"service_test.go",
|
||||
],
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//beacon-chain/db/testing:go_default_library",
|
||||
"//beacon-chain/state/stategen:go_default_library",
|
||||
"//config/params:go_default_library",
|
||||
"//encoding/bytesutil:go_default_library",
|
||||
"//proto/prysm/v1alpha1:go_default_library",
|
||||
"//proto/prysm/v1alpha1/wrapper:go_default_library",
|
||||
"//testing/require:go_default_library",
|
||||
"//testing/util:go_default_library",
|
||||
"@com_github_prysmaticlabs_eth2_types//:go_default_library",
|
||||
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
|
||||
"@com_github_sirupsen_logrus//:go_default_library",
|
||||
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
213
beacon-chain/monitor/process_attestation.go
Normal file
213
beacon-chain/monitor/process_attestation.go
Normal file
@@ -0,0 +1,213 @@
|
||||
package monitor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
types "github.com/prysmaticlabs/eth2-types"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/altair"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/state"
|
||||
"github.com/prysmaticlabs/prysm/config/params"
|
||||
"github.com/prysmaticlabs/prysm/encoding/bytesutil"
|
||||
ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/attestation"
|
||||
"github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/block"
|
||||
"github.com/prysmaticlabs/prysm/runtime/version"
|
||||
"github.com/prysmaticlabs/prysm/time/slots"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// updatedPerformanceFromTrackedVal returns true if the validator is tracked and if the
|
||||
// given slot is different than the last attested slot from this validator.
|
||||
func (s *Service) updatedPerformanceFromTrackedVal(idx types.ValidatorIndex, slot types.Slot) bool {
|
||||
if !s.TrackedIndex(types.ValidatorIndex(idx)) {
|
||||
return false
|
||||
}
|
||||
|
||||
if lp, ok := s.latestPerformance[types.ValidatorIndex(idx)]; ok {
|
||||
return lp.attestedSlot != slot
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// attestingIndices returns the indices of validators that appear in the
|
||||
// given aggregated atestation.
|
||||
func attestingIndices(ctx context.Context, state state.BeaconState, att *ethpb.Attestation) ([]uint64, error) {
|
||||
committee, err := helpers.BeaconCommitteeFromState(ctx, state, att.Data.Slot, att.Data.CommitteeIndex)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return attestation.AttestingIndices(att.AggregationBits, committee)
|
||||
}
|
||||
|
||||
// logMessageTimelyFlagsForIndex returns the log message with the basic
|
||||
// performance indicators for the attestation (head, source, target)
|
||||
func logMessageTimelyFlagsForIndex(idx types.ValidatorIndex, data *ethpb.AttestationData) logrus.Fields {
|
||||
return logrus.Fields{
|
||||
"ValidatorIndex": idx,
|
||||
"Slot": data.Slot,
|
||||
"Source": fmt.Sprintf("%#x", bytesutil.Trunc(data.Source.Root)),
|
||||
"Target": fmt.Sprintf("%#x", bytesutil.Trunc(data.Target.Root)),
|
||||
"Head": fmt.Sprintf("%#x", bytesutil.Trunc(data.BeaconBlockRoot)),
|
||||
}
|
||||
}
|
||||
|
||||
// processAttestations logs the event that one of our tracked validators'
|
||||
// attestations was included in a block
|
||||
func (s *Service) processAttestations(ctx context.Context, state state.BeaconState, blk block.BeaconBlock) {
|
||||
if blk == nil || blk.Body() == nil {
|
||||
return
|
||||
}
|
||||
for _, attestation := range blk.Body().Attestations() {
|
||||
s.processIncludedAttestation(ctx, state, attestation)
|
||||
}
|
||||
}
|
||||
|
||||
// processIncludedAttestation logs in the event that one of our tracked validators'
|
||||
// appears in the attesting indices and this included attestation was not included
|
||||
// before.
|
||||
func (s *Service) processIncludedAttestation(ctx context.Context, state state.BeaconState, att *ethpb.Attestation) {
|
||||
attestingIndices, err := attestingIndices(ctx, state, att)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not get attesting indices")
|
||||
return
|
||||
}
|
||||
for _, idx := range attestingIndices {
|
||||
if s.updatedPerformanceFromTrackedVal(types.ValidatorIndex(idx), att.Data.Slot) {
|
||||
logFields := logMessageTimelyFlagsForIndex(types.ValidatorIndex(idx), att.Data)
|
||||
balance, err := state.BalanceAtIndex(types.ValidatorIndex(idx))
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not get balance")
|
||||
return
|
||||
}
|
||||
|
||||
aggregatedPerf := s.aggregatedPerformance[types.ValidatorIndex(idx)]
|
||||
aggregatedPerf.totalAttestedCount++
|
||||
aggregatedPerf.totalRequestedCount++
|
||||
|
||||
latestPerf := s.latestPerformance[types.ValidatorIndex(idx)]
|
||||
balanceChg := balance - latestPerf.balance
|
||||
latestPerf.balanceChange = balanceChg
|
||||
latestPerf.balance = balance
|
||||
latestPerf.attestedSlot = att.Data.Slot
|
||||
latestPerf.inclusionSlot = state.Slot()
|
||||
aggregatedPerf.totalDistance += uint64(latestPerf.inclusionSlot - latestPerf.attestedSlot)
|
||||
|
||||
if state.Version() == version.Altair {
|
||||
targetIdx := params.BeaconConfig().TimelyTargetFlagIndex
|
||||
sourceIdx := params.BeaconConfig().TimelySourceFlagIndex
|
||||
headIdx := params.BeaconConfig().TimelyHeadFlagIndex
|
||||
|
||||
var participation []byte
|
||||
if slots.ToEpoch(latestPerf.inclusionSlot) ==
|
||||
slots.ToEpoch(latestPerf.attestedSlot) {
|
||||
participation, err = state.CurrentEpochParticipation()
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not get current epoch participation")
|
||||
return
|
||||
}
|
||||
} else {
|
||||
participation, err = state.PreviousEpochParticipation()
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not get previous epoch participation")
|
||||
return
|
||||
}
|
||||
}
|
||||
flags := participation[idx]
|
||||
hasFlag, err := altair.HasValidatorFlag(flags, sourceIdx)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not get timely Source flag")
|
||||
return
|
||||
}
|
||||
latestPerf.timelySource = hasFlag
|
||||
hasFlag, err = altair.HasValidatorFlag(flags, headIdx)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not get timely Head flag")
|
||||
return
|
||||
}
|
||||
latestPerf.timelyHead = hasFlag
|
||||
hasFlag, err = altair.HasValidatorFlag(flags, targetIdx)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not get timely Target flag")
|
||||
return
|
||||
}
|
||||
latestPerf.timelyTarget = hasFlag
|
||||
|
||||
if latestPerf.timelySource {
|
||||
aggregatedPerf.totalCorrectSource++
|
||||
}
|
||||
if latestPerf.timelyHead {
|
||||
aggregatedPerf.totalCorrectHead++
|
||||
}
|
||||
if latestPerf.timelyTarget {
|
||||
aggregatedPerf.totalCorrectTarget++
|
||||
}
|
||||
}
|
||||
logFields["CorrectHead"] = latestPerf.timelyHead
|
||||
logFields["CorrectSource"] = latestPerf.timelySource
|
||||
logFields["CorrectTarget"] = latestPerf.timelyTarget
|
||||
logFields["InclusionSlot"] = latestPerf.inclusionSlot
|
||||
logFields["NewBalance"] = balance
|
||||
logFields["BalanceChange"] = balanceChg
|
||||
|
||||
s.latestPerformance[types.ValidatorIndex(idx)] = latestPerf
|
||||
s.aggregatedPerformance[types.ValidatorIndex(idx)] = aggregatedPerf
|
||||
log.WithFields(logFields).Info("Attestation included")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// processUnaggregatedAttestation logs when the beacon node sees an unaggregated attestation from one of our
|
||||
// tracked validators
|
||||
func (s *Service) processUnaggregatedAttestation(ctx context.Context, att *ethpb.Attestation) {
|
||||
root := bytesutil.ToBytes32(att.Data.BeaconBlockRoot)
|
||||
state := s.config.StateGen.StateByRootIfCachedNoCopy(root)
|
||||
if state == nil {
|
||||
log.Debug("Skipping unaggregated attestation due to state not found in cache")
|
||||
return
|
||||
}
|
||||
attestingIndices, err := attestingIndices(ctx, state, att)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not get attesting indices")
|
||||
return
|
||||
}
|
||||
for _, idx := range attestingIndices {
|
||||
if s.updatedPerformanceFromTrackedVal(types.ValidatorIndex(idx), att.Data.Slot) {
|
||||
logFields := logMessageTimelyFlagsForIndex(types.ValidatorIndex(idx), att.Data)
|
||||
log.WithFields(logFields).Info("Processed unaggregated attestation")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// processAggregatedAttestation logs when we see an aggregation from one of our tracked validators or an aggregated
|
||||
// attestation from one of our tracked validators
|
||||
func (s *Service) processAggregatedAttestation(ctx context.Context, att *ethpb.AggregateAttestationAndProof) {
|
||||
if s.TrackedIndex(att.AggregatorIndex) {
|
||||
log.WithFields(logrus.Fields{
|
||||
"ValidatorIndex": att.AggregatorIndex,
|
||||
}).Info("Processed attestation aggregation")
|
||||
aggregatedPerf := s.aggregatedPerformance[att.AggregatorIndex]
|
||||
aggregatedPerf.totalAggregations++
|
||||
s.aggregatedPerformance[att.AggregatorIndex] = aggregatedPerf
|
||||
}
|
||||
|
||||
var root [32]byte
|
||||
copy(root[:], att.Aggregate.Data.BeaconBlockRoot)
|
||||
state := s.config.StateGen.StateByRootIfCachedNoCopy(root)
|
||||
if state == nil {
|
||||
log.Debug("Skipping agregated attestation due to state not found in cache")
|
||||
return
|
||||
}
|
||||
attestingIndices, err := attestingIndices(ctx, state, att.Aggregate)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not get attesting indices")
|
||||
return
|
||||
}
|
||||
for _, idx := range attestingIndices {
|
||||
if s.updatedPerformanceFromTrackedVal(types.ValidatorIndex(idx), att.Aggregate.Data.Slot) {
|
||||
logFields := logMessageTimelyFlagsForIndex(types.ValidatorIndex(idx), att.Aggregate.Data)
|
||||
log.WithFields(logFields).Info("Processed aggregated attestation")
|
||||
}
|
||||
}
|
||||
}
|
||||
287
beacon-chain/monitor/process_attestation_test.go
Normal file
287
beacon-chain/monitor/process_attestation_test.go
Normal file
@@ -0,0 +1,287 @@
|
||||
package monitor
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
types "github.com/prysmaticlabs/eth2-types"
|
||||
"github.com/prysmaticlabs/go-bitfield"
|
||||
testDB "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/state/stategen"
|
||||
"github.com/prysmaticlabs/prysm/encoding/bytesutil"
|
||||
ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/wrapper"
|
||||
"github.com/prysmaticlabs/prysm/testing/require"
|
||||
"github.com/prysmaticlabs/prysm/testing/util"
|
||||
"github.com/sirupsen/logrus"
|
||||
logTest "github.com/sirupsen/logrus/hooks/test"
|
||||
)
|
||||
|
||||
func setupService(t *testing.T) *Service {
|
||||
beaconDB := testDB.SetupDB(t)
|
||||
|
||||
trackedVals := map[types.ValidatorIndex]interface{}{
|
||||
1: nil,
|
||||
2: nil,
|
||||
12: nil,
|
||||
}
|
||||
latestPerformance := map[types.ValidatorIndex]ValidatorLatestPerformance{
|
||||
1: {
|
||||
balance: 32000000000,
|
||||
},
|
||||
2: {
|
||||
balance: 32000000000,
|
||||
},
|
||||
12: {
|
||||
balance: 31900000000,
|
||||
},
|
||||
}
|
||||
|
||||
aggregatedPerformance := map[types.ValidatorIndex]ValidatorAggregatedPerformance{
|
||||
1: {},
|
||||
2: {},
|
||||
12: {},
|
||||
}
|
||||
|
||||
return &Service{
|
||||
config: &ValidatorMonitorConfig{
|
||||
StateGen: stategen.New(beaconDB),
|
||||
TrackedValidators: trackedVals,
|
||||
},
|
||||
latestPerformance: latestPerformance,
|
||||
aggregatedPerformance: aggregatedPerformance,
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetAttestingIndices(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
beaconState, _ := util.DeterministicGenesisState(t, 256)
|
||||
att := ðpb.Attestation{
|
||||
Data: ðpb.AttestationData{
|
||||
Slot: 1,
|
||||
CommitteeIndex: 0,
|
||||
},
|
||||
AggregationBits: bitfield.Bitlist{0b11, 0b1},
|
||||
}
|
||||
attestingIndices, err := attestingIndices(ctx, beaconState, att)
|
||||
require.NoError(t, err)
|
||||
require.DeepEqual(t, attestingIndices, []uint64{0xc, 0x2})
|
||||
|
||||
}
|
||||
|
||||
func TestProcessIncludedAttestationTwoTracked(t *testing.T) {
|
||||
hook := logTest.NewGlobal()
|
||||
s := setupService(t)
|
||||
state, _ := util.DeterministicGenesisStateAltair(t, 256)
|
||||
require.NoError(t, state.SetSlot(2))
|
||||
require.NoError(t, state.SetCurrentParticipationBits(bytes.Repeat([]byte{0xff}, 13)))
|
||||
|
||||
att := ðpb.Attestation{
|
||||
Data: ðpb.AttestationData{
|
||||
Slot: 1,
|
||||
CommitteeIndex: 0,
|
||||
BeaconBlockRoot: bytesutil.PadTo([]byte("hello-world"), 32),
|
||||
Source: ðpb.Checkpoint{
|
||||
Epoch: 0,
|
||||
Root: bytesutil.PadTo([]byte("hello-world"), 32),
|
||||
},
|
||||
Target: ðpb.Checkpoint{
|
||||
Epoch: 1,
|
||||
Root: bytesutil.PadTo([]byte("hello-world"), 32),
|
||||
},
|
||||
},
|
||||
AggregationBits: bitfield.Bitlist{0b11, 0b1},
|
||||
}
|
||||
s.processIncludedAttestation(context.Background(), state, att)
|
||||
wanted1 := "\"Attestation included\" BalanceChange=0 CorrectHead=true CorrectSource=true CorrectTarget=true Head=0x68656c6c6f2d InclusionSlot=2 NewBalance=32000000000 Slot=1 Source=0x68656c6c6f2d Target=0x68656c6c6f2d ValidatorIndex=2 prefix=monitor"
|
||||
wanted2 := "\"Attestation included\" BalanceChange=100000000 CorrectHead=true CorrectSource=true CorrectTarget=true Head=0x68656c6c6f2d InclusionSlot=2 NewBalance=32000000000 Slot=1 Source=0x68656c6c6f2d Target=0x68656c6c6f2d ValidatorIndex=12 prefix=monitor"
|
||||
require.LogsContain(t, hook, wanted1)
|
||||
require.LogsContain(t, hook, wanted2)
|
||||
}
|
||||
|
||||
func TestProcessUnaggregatedAttestationStateNotCached(t *testing.T) {
|
||||
logrus.SetLevel(logrus.DebugLevel)
|
||||
hook := logTest.NewGlobal()
|
||||
ctx := context.Background()
|
||||
|
||||
s := setupService(t)
|
||||
state, _ := util.DeterministicGenesisStateAltair(t, 256)
|
||||
require.NoError(t, state.SetSlot(2))
|
||||
header := state.LatestBlockHeader()
|
||||
participation := []byte{0xff, 0xff, 0x01, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
|
||||
require.NoError(t, state.SetCurrentParticipationBits(participation))
|
||||
|
||||
att := ðpb.Attestation{
|
||||
Data: ðpb.AttestationData{
|
||||
Slot: 1,
|
||||
CommitteeIndex: 0,
|
||||
BeaconBlockRoot: header.GetStateRoot(),
|
||||
Source: ðpb.Checkpoint{
|
||||
Epoch: 0,
|
||||
Root: bytesutil.PadTo([]byte("hello-world"), 32),
|
||||
},
|
||||
Target: ðpb.Checkpoint{
|
||||
Epoch: 1,
|
||||
Root: bytesutil.PadTo([]byte("hello-world"), 32),
|
||||
},
|
||||
},
|
||||
AggregationBits: bitfield.Bitlist{0b11, 0b1},
|
||||
}
|
||||
s.processUnaggregatedAttestation(ctx, att)
|
||||
require.LogsContain(t, hook, "Skipping unaggregated attestation due to state not found in cache")
|
||||
logrus.SetLevel(logrus.InfoLevel)
|
||||
}
|
||||
|
||||
func TestProcessUnaggregatedAttestationStateCached(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
hook := logTest.NewGlobal()
|
||||
|
||||
s := setupService(t)
|
||||
state, _ := util.DeterministicGenesisStateAltair(t, 256)
|
||||
participation := []byte{0xff, 0xff, 0x01, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
|
||||
require.NoError(t, state.SetCurrentParticipationBits(participation))
|
||||
|
||||
root := [32]byte{}
|
||||
copy(root[:], "hello-world")
|
||||
|
||||
att := ðpb.Attestation{
|
||||
Data: ðpb.AttestationData{
|
||||
Slot: 1,
|
||||
CommitteeIndex: 0,
|
||||
BeaconBlockRoot: root[:],
|
||||
Source: ðpb.Checkpoint{
|
||||
Epoch: 0,
|
||||
Root: root[:],
|
||||
},
|
||||
Target: ðpb.Checkpoint{
|
||||
Epoch: 1,
|
||||
Root: root[:],
|
||||
},
|
||||
},
|
||||
AggregationBits: bitfield.Bitlist{0b11, 0b1},
|
||||
}
|
||||
require.NoError(t, s.config.StateGen.SaveState(ctx, root, state))
|
||||
s.processUnaggregatedAttestation(context.Background(), att)
|
||||
wanted1 := "\"Processed unaggregated attestation\" Head=0x68656c6c6f2d Slot=1 Source=0x68656c6c6f2d Target=0x68656c6c6f2d ValidatorIndex=2 prefix=monitor"
|
||||
wanted2 := "\"Processed unaggregated attestation\" Head=0x68656c6c6f2d Slot=1 Source=0x68656c6c6f2d Target=0x68656c6c6f2d ValidatorIndex=12 prefix=monitor"
|
||||
require.LogsContain(t, hook, wanted1)
|
||||
require.LogsContain(t, hook, wanted2)
|
||||
}
|
||||
|
||||
func TestProcessAggregatedAttestationStateNotCached(t *testing.T) {
|
||||
logrus.SetLevel(logrus.DebugLevel)
|
||||
hook := logTest.NewGlobal()
|
||||
ctx := context.Background()
|
||||
|
||||
s := setupService(t)
|
||||
state, _ := util.DeterministicGenesisStateAltair(t, 256)
|
||||
require.NoError(t, state.SetSlot(2))
|
||||
header := state.LatestBlockHeader()
|
||||
participation := []byte{0xff, 0xff, 0x01, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
|
||||
require.NoError(t, state.SetCurrentParticipationBits(participation))
|
||||
|
||||
att := ðpb.AggregateAttestationAndProof{
|
||||
AggregatorIndex: 2,
|
||||
Aggregate: ðpb.Attestation{
|
||||
Data: ðpb.AttestationData{
|
||||
Slot: 1,
|
||||
CommitteeIndex: 0,
|
||||
BeaconBlockRoot: header.GetStateRoot(),
|
||||
Source: ðpb.Checkpoint{
|
||||
Epoch: 0,
|
||||
Root: bytesutil.PadTo([]byte("hello-world"), 32),
|
||||
},
|
||||
Target: ðpb.Checkpoint{
|
||||
Epoch: 1,
|
||||
Root: bytesutil.PadTo([]byte("hello-world"), 32),
|
||||
},
|
||||
},
|
||||
AggregationBits: bitfield.Bitlist{0b11, 0b1},
|
||||
},
|
||||
}
|
||||
s.processAggregatedAttestation(ctx, att)
|
||||
require.LogsContain(t, hook, "\"Processed attestation aggregation\" ValidatorIndex=2 prefix=monitor")
|
||||
require.LogsContain(t, hook, "Skipping agregated attestation due to state not found in cache")
|
||||
logrus.SetLevel(logrus.InfoLevel)
|
||||
}
|
||||
|
||||
func TestProcessAggregatedAttestationStateCached(t *testing.T) {
|
||||
hook := logTest.NewGlobal()
|
||||
ctx := context.Background()
|
||||
s := setupService(t)
|
||||
state, _ := util.DeterministicGenesisStateAltair(t, 256)
|
||||
participation := []byte{0xff, 0xff, 0x01, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
|
||||
require.NoError(t, state.SetCurrentParticipationBits(participation))
|
||||
|
||||
root := [32]byte{}
|
||||
copy(root[:], "hello-world")
|
||||
|
||||
att := ðpb.AggregateAttestationAndProof{
|
||||
AggregatorIndex: 2,
|
||||
Aggregate: ðpb.Attestation{
|
||||
Data: ðpb.AttestationData{
|
||||
Slot: 1,
|
||||
CommitteeIndex: 0,
|
||||
BeaconBlockRoot: root[:],
|
||||
Source: ðpb.Checkpoint{
|
||||
Epoch: 0,
|
||||
Root: root[:],
|
||||
},
|
||||
Target: ðpb.Checkpoint{
|
||||
Epoch: 1,
|
||||
Root: root[:],
|
||||
},
|
||||
},
|
||||
AggregationBits: bitfield.Bitlist{0b10, 0b1},
|
||||
},
|
||||
}
|
||||
|
||||
require.NoError(t, s.config.StateGen.SaveState(ctx, root, state))
|
||||
s.processAggregatedAttestation(ctx, att)
|
||||
require.LogsContain(t, hook, "\"Processed attestation aggregation\" ValidatorIndex=2 prefix=monitor")
|
||||
require.LogsContain(t, hook, "\"Processed aggregated attestation\" Head=0x68656c6c6f2d Slot=1 Source=0x68656c6c6f2d Target=0x68656c6c6f2d ValidatorIndex=2 prefix=monitor")
|
||||
require.LogsDoNotContain(t, hook, "\"Processed aggregated attestation\" Head=0x68656c6c6f2d Slot=1 Source=0x68656c6c6f2d Target=0x68656c6c6f2d ValidatorIndex=12 prefix=monitor")
|
||||
}
|
||||
|
||||
func TestProcessAttestations(t *testing.T) {
|
||||
hook := logTest.NewGlobal()
|
||||
s := setupService(t)
|
||||
ctx := context.Background()
|
||||
state, _ := util.DeterministicGenesisStateAltair(t, 256)
|
||||
require.NoError(t, state.SetSlot(2))
|
||||
require.NoError(t, state.SetCurrentParticipationBits(bytes.Repeat([]byte{0xff}, 13)))
|
||||
|
||||
att := ðpb.Attestation{
|
||||
Data: ðpb.AttestationData{
|
||||
Slot: 1,
|
||||
CommitteeIndex: 0,
|
||||
BeaconBlockRoot: bytesutil.PadTo([]byte("hello-world"), 32),
|
||||
Source: ðpb.Checkpoint{
|
||||
Epoch: 0,
|
||||
Root: bytesutil.PadTo([]byte("hello-world"), 32),
|
||||
},
|
||||
Target: ðpb.Checkpoint{
|
||||
Epoch: 1,
|
||||
Root: bytesutil.PadTo([]byte("hello-world"), 32),
|
||||
},
|
||||
},
|
||||
AggregationBits: bitfield.Bitlist{0b11, 0b1},
|
||||
}
|
||||
|
||||
block := ðpb.BeaconBlockAltair{
|
||||
Slot: 2,
|
||||
Body: ðpb.BeaconBlockBodyAltair{
|
||||
Attestations: []*ethpb.Attestation{att},
|
||||
},
|
||||
}
|
||||
|
||||
wrappedBlock, err := wrapper.WrappedAltairBeaconBlock(block)
|
||||
require.NoError(t, err)
|
||||
s.processAttestations(ctx, state, wrappedBlock)
|
||||
wanted1 := "\"Attestation included\" BalanceChange=0 CorrectHead=true CorrectSource=true CorrectTarget=true Head=0x68656c6c6f2d InclusionSlot=2 NewBalance=32000000000 Slot=1 Source=0x68656c6c6f2d Target=0x68656c6c6f2d ValidatorIndex=2 prefix=monitor"
|
||||
wanted2 := "\"Attestation included\" BalanceChange=100000000 CorrectHead=true CorrectSource=true CorrectTarget=true Head=0x68656c6c6f2d InclusionSlot=2 NewBalance=32000000000 Slot=1 Source=0x68656c6c6f2d Target=0x68656c6c6f2d ValidatorIndex=12 prefix=monitor"
|
||||
require.LogsContain(t, hook, wanted1)
|
||||
require.LogsContain(t, hook, wanted2)
|
||||
|
||||
}
|
||||
@@ -2,6 +2,7 @@ package monitor
|
||||
|
||||
import (
|
||||
types "github.com/prysmaticlabs/eth2-types"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/state/stategen"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
@@ -9,17 +10,43 @@ var (
|
||||
log = logrus.WithField("prefix", "monitor")
|
||||
)
|
||||
|
||||
// ValidatorLatestPerformance keeps track of the latest participation of the validator
|
||||
type ValidatorLatestPerformance struct {
|
||||
attestedSlot types.Slot
|
||||
inclusionSlot types.Slot
|
||||
timelySource bool
|
||||
timelyTarget bool
|
||||
timelyHead bool
|
||||
balance uint64
|
||||
balanceChange uint64
|
||||
}
|
||||
|
||||
// ValidatorAggregatedPerformance keeps track of the accumulated performance of
|
||||
// the validator since launch
|
||||
type ValidatorAggregatedPerformance struct {
|
||||
totalAttestedCount uint64
|
||||
totalRequestedCount uint64
|
||||
totalDistance uint64
|
||||
totalCorrectSource uint64
|
||||
totalCorrectTarget uint64
|
||||
totalCorrectHead uint64
|
||||
totalAggregations uint64
|
||||
}
|
||||
|
||||
// ValidatorMonitorConfig contains the list of validator indices that the
|
||||
// monitor service tracks, as well as the event feed notifier that the
|
||||
// monitor needs to subscribe.
|
||||
type ValidatorMonitorConfig struct {
|
||||
StateGen stategen.StateManager
|
||||
TrackedValidators map[types.ValidatorIndex]interface{}
|
||||
}
|
||||
|
||||
// Service is the main structure that tracks validators and reports logs and
|
||||
// metrics of their performances throughout their lifetime.
|
||||
type Service struct {
|
||||
config *ValidatorMonitorConfig
|
||||
config *ValidatorMonitorConfig
|
||||
latestPerformance map[types.ValidatorIndex]ValidatorLatestPerformance
|
||||
aggregatedPerformance map[types.ValidatorIndex]ValidatorAggregatedPerformance
|
||||
}
|
||||
|
||||
// TrackedIndex returns if the given validator index corresponds to one of the
|
||||
|
||||
@@ -37,6 +37,15 @@ func (s *State) HasStateInCache(ctx context.Context, blockRoot [32]byte) (bool,
|
||||
return has, nil
|
||||
}
|
||||
|
||||
// StateByRootIfCached retrieves a state using the input block root only if the state is already in the cache
|
||||
func (s *State) StateByRootIfCachedNoCopy(blockRoot [32]byte) state.BeaconState {
|
||||
if !s.hotStateCache.has(blockRoot) {
|
||||
return nil
|
||||
}
|
||||
state := s.hotStateCache.getWithoutCopy(blockRoot)
|
||||
return state
|
||||
}
|
||||
|
||||
// StateByRoot retrieves the state using input block root.
|
||||
func (s *State) StateByRoot(ctx context.Context, blockRoot [32]byte) (state.BeaconState, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "stateGen.StateByRoot")
|
||||
|
||||
@@ -56,6 +56,44 @@ func TestStateByRoot_ColdState(t *testing.T) {
|
||||
require.DeepSSZEqual(t, loadedState.InnerStateUnsafe(), beaconState.InnerStateUnsafe())
|
||||
}
|
||||
|
||||
func TestStateByRootIfCachedNoCopy_HotState(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
beaconDB := testDB.SetupDB(t)
|
||||
|
||||
service := New(beaconDB)
|
||||
|
||||
beaconState, _ := util.DeterministicGenesisState(t, 32)
|
||||
r := [32]byte{'A'}
|
||||
require.NoError(t, service.beaconDB.SaveStateSummary(ctx, ðpb.StateSummary{Root: r[:]}))
|
||||
service.hotStateCache.put(r, beaconState)
|
||||
|
||||
loadedState := service.StateByRootIfCachedNoCopy(r)
|
||||
require.DeepSSZEqual(t, loadedState.InnerStateUnsafe(), beaconState.InnerStateUnsafe())
|
||||
}
|
||||
|
||||
func TestStateByRootIfCachedNoCopy_ColdState(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
beaconDB := testDB.SetupDB(t)
|
||||
|
||||
service := New(beaconDB)
|
||||
service.finalizedInfo.slot = 2
|
||||
service.slotsPerArchivedPoint = 1
|
||||
|
||||
b := util.NewBeaconBlock()
|
||||
b.Block.Slot = 1
|
||||
require.NoError(t, beaconDB.SaveBlock(ctx, wrapper.WrappedPhase0SignedBeaconBlock(b)))
|
||||
bRoot, err := b.Block.HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
beaconState, _ := util.DeterministicGenesisState(t, 32)
|
||||
require.NoError(t, beaconState.SetSlot(1))
|
||||
require.NoError(t, service.beaconDB.SaveState(ctx, beaconState, bRoot))
|
||||
require.NoError(t, service.beaconDB.SaveBlock(ctx, wrapper.WrappedPhase0SignedBeaconBlock(b)))
|
||||
require.NoError(t, service.beaconDB.SaveGenesisBlockRoot(ctx, bRoot))
|
||||
loadedState := service.StateByRootIfCachedNoCopy(bRoot)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, loadedState, nil)
|
||||
}
|
||||
|
||||
func TestStateByRoot_HotStateUsingEpochBoundaryCacheNoReplay(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
beaconDB := testDB.SetupDB(t)
|
||||
|
||||
@@ -23,6 +23,11 @@ func NewMockService() *MockStateManager {
|
||||
}
|
||||
}
|
||||
|
||||
// StateByRootIfCached
|
||||
func (m *MockStateManager) StateByRootIfCachedNoCopy(_ [32]byte) state.BeaconState { // skipcq: RVV-B0013
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
// Resume --
|
||||
func (m *MockStateManager) Resume(_ context.Context, _ state.BeaconState) (state.BeaconState, error) {
|
||||
panic("implement me")
|
||||
|
||||
@@ -31,6 +31,7 @@ type StateManager interface {
|
||||
HasState(ctx context.Context, blockRoot [32]byte) (bool, error)
|
||||
HasStateInCache(ctx context.Context, blockRoot [32]byte) (bool, error)
|
||||
StateByRoot(ctx context.Context, blockRoot [32]byte) (state.BeaconState, error)
|
||||
StateByRootIfCachedNoCopy(blockRoot [32]byte) state.BeaconState
|
||||
StateByRootInitialSync(ctx context.Context, blockRoot [32]byte) (state.BeaconState, error)
|
||||
StateBySlot(ctx context.Context, slot types.Slot) (state.BeaconState, error)
|
||||
RecoverStateSummary(ctx context.Context, blockRoot [32]byte) (*ethpb.StateSummary, error)
|
||||
|
||||
Reference in New Issue
Block a user