Monitor service (#9933)

* Add a service for the monitor

* Do not block service start

* gaz

* move channel subscription outide go routine

* add service start test

* fix panic on node tests

* Radek's first pass

* Radek's take 2

* uncap error messages

* revert reversal

* Terence take 1

* gaz

* Missing locks found by Terence

* Track via bool not empty interface

* Add tests for every function

* fix allocation of slice

* Minor cleanups

Co-authored-by: terence tsao <terence@prysmaticlabs.com>
This commit is contained in:
Potuz
2021-11-30 19:27:03 -03:00
committed by GitHub
parent 2c921ec628
commit afbe02697d
12 changed files with 569 additions and 83 deletions

View File

@@ -14,8 +14,13 @@ go_library(
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/monitor", importpath = "github.com/prysmaticlabs/prysm/beacon-chain/monitor",
visibility = ["//beacon-chain:__subpackages__"], visibility = ["//beacon-chain:__subpackages__"],
deps = [ deps = [
"//async/event:go_default_library",
"//beacon-chain/blockchain:go_default_library",
"//beacon-chain/core/altair:go_default_library", "//beacon-chain/core/altair:go_default_library",
"//beacon-chain/core/blocks:go_default_library", "//beacon-chain/core/blocks:go_default_library",
"//beacon-chain/core/feed:go_default_library",
"//beacon-chain/core/feed/operation:go_default_library",
"//beacon-chain/core/feed/state:go_default_library",
"//beacon-chain/core/helpers:go_default_library", "//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/state:go_default_library", "//beacon-chain/state:go_default_library",
"//beacon-chain/state/stategen:go_default_library", "//beacon-chain/state/stategen:go_default_library",
@@ -44,7 +49,10 @@ go_test(
], ],
embed = [":go_default_library"], embed = [":go_default_library"],
deps = [ deps = [
"//beacon-chain/blockchain/testing:go_default_library",
"//beacon-chain/core/altair:go_default_library", "//beacon-chain/core/altair:go_default_library",
"//beacon-chain/core/feed:go_default_library",
"//beacon-chain/core/feed/state:go_default_library",
"//beacon-chain/db/testing:go_default_library", "//beacon-chain/db/testing:go_default_library",
"//beacon-chain/state/stategen:go_default_library", "//beacon-chain/state/stategen:go_default_library",
"//config/params:go_default_library", "//config/params:go_default_library",
@@ -53,6 +61,7 @@ go_test(
"//proto/prysm/v1alpha1/wrapper:go_default_library", "//proto/prysm/v1alpha1/wrapper:go_default_library",
"//testing/require:go_default_library", "//testing/require:go_default_library",
"//testing/util:go_default_library", "//testing/util:go_default_library",
"//time/slots:go_default_library",
"@com_github_prysmaticlabs_eth2_types//:go_default_library", "@com_github_prysmaticlabs_eth2_types//:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library", "@com_github_prysmaticlabs_go_bitfield//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library", "@com_github_sirupsen_logrus//:go_default_library",

View File

@@ -5,10 +5,7 @@ import (
"context" "context"
"testing" "testing"
types "github.com/prysmaticlabs/eth2-types"
"github.com/prysmaticlabs/go-bitfield" "github.com/prysmaticlabs/go-bitfield"
testDB "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/state/stategen"
"github.com/prysmaticlabs/prysm/encoding/bytesutil" "github.com/prysmaticlabs/prysm/encoding/bytesutil"
ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1" ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/wrapper" "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/wrapper"
@@ -18,52 +15,6 @@ import (
logTest "github.com/sirupsen/logrus/hooks/test" logTest "github.com/sirupsen/logrus/hooks/test"
) )
func setupService(t *testing.T) *Service {
beaconDB := testDB.SetupDB(t)
trackedVals := map[types.ValidatorIndex]interface{}{
1: nil,
2: nil,
12: nil,
15: nil,
}
latestPerformance := map[types.ValidatorIndex]ValidatorLatestPerformance{
1: {
balance: 32000000000,
},
2: {
balance: 32000000000,
},
12: {
balance: 31900000000,
},
15: {
balance: 31900000000,
},
}
aggregatedPerformance := map[types.ValidatorIndex]ValidatorAggregatedPerformance{
1: {},
2: {},
12: {},
15: {},
}
trackedSyncCommitteeIndices := map[types.ValidatorIndex][]types.CommitteeIndex{
1: {0, 1, 2, 3},
12: {4, 5},
}
return &Service{
config: &ValidatorMonitorConfig{
StateGen: stategen.New(beaconDB),
},
TrackedValidators: trackedVals,
latestPerformance: latestPerformance,
aggregatedPerformance: aggregatedPerformance,
trackedSyncCommitteeIndices: trackedSyncCommitteeIndices,
lastSyncedEpoch: 0,
}
}
func TestGetAttestingIndices(t *testing.T) { func TestGetAttestingIndices(t *testing.T) {
ctx := context.Background() ctx := context.Background()
beaconState, _ := util.DeterministicGenesisState(t, 256) beaconState, _ := util.DeterministicGenesisState(t, 256)

View File

@@ -117,9 +117,9 @@ func TestProcessSlashings(t *testing.T) {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
hook := logTest.NewGlobal() hook := logTest.NewGlobal()
s := &Service{ s := &Service{
TrackedValidators: map[types.ValidatorIndex]interface{}{ TrackedValidators: map[types.ValidatorIndex]bool{
1: nil, 1: true,
2: nil, 2: true,
}, },
} }
s.processSlashings(wrapper.WrappedPhase0BeaconBlock(tt.block)) s.processSlashings(wrapper.WrappedPhase0BeaconBlock(tt.block))
@@ -205,7 +205,7 @@ func TestProcessBlock_AllEventsTrackedVals(t *testing.T) {
idx := b.Block.Body.ProposerSlashings[0].Header_1.Header.ProposerIndex idx := b.Block.Body.ProposerSlashings[0].Header_1.Header.ProposerIndex
s.RLock() s.RLock()
if !s.trackedIndex(idx) { if !s.trackedIndex(idx) {
s.TrackedValidators[idx] = nil s.TrackedValidators[idx] = true
s.latestPerformance[idx] = ValidatorLatestPerformance{ s.latestPerformance[idx] = ValidatorLatestPerformance{
balance: 31900000000, balance: 31900000000,
} }

View File

@@ -13,9 +13,9 @@ import (
func TestProcessExitsFromBlockTrackedIndices(t *testing.T) { func TestProcessExitsFromBlockTrackedIndices(t *testing.T) {
hook := logTest.NewGlobal() hook := logTest.NewGlobal()
s := &Service{ s := &Service{
TrackedValidators: map[types.ValidatorIndex]interface{}{ TrackedValidators: map[types.ValidatorIndex]bool{
1: nil, 1: true,
2: nil, 2: true,
}, },
} }
@@ -47,9 +47,9 @@ func TestProcessExitsFromBlockTrackedIndices(t *testing.T) {
func TestProcessExitsFromBlockUntrackedIndices(t *testing.T) { func TestProcessExitsFromBlockUntrackedIndices(t *testing.T) {
hook := logTest.NewGlobal() hook := logTest.NewGlobal()
s := &Service{ s := &Service{
TrackedValidators: map[types.ValidatorIndex]interface{}{ TrackedValidators: map[types.ValidatorIndex]bool{
1: nil, 1: true,
2: nil, 2: true,
}, },
} }
@@ -81,9 +81,9 @@ func TestProcessExitsFromBlockUntrackedIndices(t *testing.T) {
func TestProcessExitP2PTrackedIndices(t *testing.T) { func TestProcessExitP2PTrackedIndices(t *testing.T) {
hook := logTest.NewGlobal() hook := logTest.NewGlobal()
s := &Service{ s := &Service{
TrackedValidators: map[types.ValidatorIndex]interface{}{ TrackedValidators: map[types.ValidatorIndex]bool{
1: nil, 1: true,
2: nil, 2: true,
}, },
} }
@@ -101,9 +101,9 @@ func TestProcessExitP2PTrackedIndices(t *testing.T) {
func TestProcessExitP2PUntrackedIndices(t *testing.T) { func TestProcessExitP2PUntrackedIndices(t *testing.T) {
hook := logTest.NewGlobal() hook := logTest.NewGlobal()
s := &Service{ s := &Service{
TrackedValidators: map[types.ValidatorIndex]interface{}{ TrackedValidators: map[types.ValidatorIndex]bool{
1: nil, 1: true,
2: nil, 2: true,
}, },
} }

View File

@@ -1,13 +1,30 @@
package monitor package monitor
import ( import (
"context"
"errors"
"sort"
"sync" "sync"
types "github.com/prysmaticlabs/eth2-types" types "github.com/prysmaticlabs/eth2-types"
"github.com/prysmaticlabs/prysm/async/event"
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed/operation"
statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/state" "github.com/prysmaticlabs/prysm/beacon-chain/state"
"github.com/prysmaticlabs/prysm/beacon-chain/state/stategen" "github.com/prysmaticlabs/prysm/beacon-chain/state/stategen"
"github.com/prysmaticlabs/prysm/time/slots" "github.com/prysmaticlabs/prysm/time/slots"
"github.com/sirupsen/logrus"
)
var (
// Error when event feed data is not statefeed.SyncedData.
errNotSyncedData = errors.New("event feed data is not of type *statefeed.SyncedData")
// Error when the context is closed while waiting for sync.
errContextClosedWhileWaiting = errors.New("context closed while waiting for beacon to sync to latest Head")
) )
// ValidatorLatestPerformance keeps track of the latest participation of the validator // ValidatorLatestPerformance keeps track of the latest participation of the validator
@@ -24,6 +41,8 @@ type ValidatorLatestPerformance struct {
// ValidatorAggregatedPerformance keeps track of the accumulated performance of // ValidatorAggregatedPerformance keeps track of the accumulated performance of
// the validator since launch // the validator since launch
type ValidatorAggregatedPerformance struct { type ValidatorAggregatedPerformance struct {
startEpoch types.Epoch
startBalance uint64
totalAttestedCount uint64 totalAttestedCount uint64
totalRequestedCount uint64 totalRequestedCount uint64
totalDistance uint64 totalDistance uint64
@@ -40,25 +59,229 @@ type ValidatorAggregatedPerformance struct {
// monitor service tracks, as well as the event feed notifier that the // monitor service tracks, as well as the event feed notifier that the
// monitor needs to subscribe. // monitor needs to subscribe.
type ValidatorMonitorConfig struct { type ValidatorMonitorConfig struct {
StateGen stategen.StateManager StateNotifier statefeed.Notifier
AttestationNotifier operation.Notifier
HeadFetcher blockchain.HeadFetcher
StateGen stategen.StateManager
} }
// Service is the main structure that tracks validators and reports logs and // Service is the main structure that tracks validators and reports logs and
// metrics of their performances throughout their lifetime. // metrics of their performances throughout their lifetime.
type Service struct { type Service struct {
config *ValidatorMonitorConfig config *ValidatorMonitorConfig
ctx context.Context
cancel context.CancelFunc
isLogging bool
// Locks access to TrackedValidators, latestPerformance, aggregatedPerformance, // Locks access to TrackedValidators, latestPerformance, aggregatedPerformance,
// trackedSyncedCommitteeIndices and lastSyncedEpoch // trackedSyncedCommitteeIndices and lastSyncedEpoch
sync.RWMutex sync.RWMutex
TrackedValidators map[types.ValidatorIndex]interface{} TrackedValidators map[types.ValidatorIndex]bool
latestPerformance map[types.ValidatorIndex]ValidatorLatestPerformance latestPerformance map[types.ValidatorIndex]ValidatorLatestPerformance
aggregatedPerformance map[types.ValidatorIndex]ValidatorAggregatedPerformance aggregatedPerformance map[types.ValidatorIndex]ValidatorAggregatedPerformance
trackedSyncCommitteeIndices map[types.ValidatorIndex][]types.CommitteeIndex trackedSyncCommitteeIndices map[types.ValidatorIndex][]types.CommitteeIndex
lastSyncedEpoch types.Epoch lastSyncedEpoch types.Epoch
} }
// NewService sets up a new validator monitor instance when given a list of validator indices to track.
func NewService(ctx context.Context, config *ValidatorMonitorConfig, tracked []types.ValidatorIndex) (*Service, error) {
ctx, cancel := context.WithCancel(ctx)
r := &Service{
config: config,
ctx: ctx,
cancel: cancel,
TrackedValidators: make(map[types.ValidatorIndex]bool, len(tracked)),
latestPerformance: make(map[types.ValidatorIndex]ValidatorLatestPerformance),
aggregatedPerformance: make(map[types.ValidatorIndex]ValidatorAggregatedPerformance),
trackedSyncCommitteeIndices: make(map[types.ValidatorIndex][]types.CommitteeIndex),
}
for _, idx := range tracked {
r.TrackedValidators[idx] = true
}
return r, nil
}
// Start sets up the TrackedValidators map and then calls to wait until the beacon is synced.
func (s *Service) Start() {
s.Lock()
defer s.Unlock()
tracked := make([]types.ValidatorIndex, 0, len(s.TrackedValidators))
for idx := range s.TrackedValidators {
tracked = append(tracked, idx)
}
sort.Slice(tracked, func(i, j int) bool { return tracked[i] < tracked[j] })
log.WithFields(logrus.Fields{
"ValidatorIndices": tracked,
}).Info("Starting service")
s.isLogging = false
stateChannel := make(chan *feed.Event, 1)
stateSub := s.config.StateNotifier.StateFeed().Subscribe(stateChannel)
go s.run(stateChannel, stateSub)
}
// run waits until the beacon is synced and starts the monitoring system.
func (s *Service) run(stateChannel chan *feed.Event, stateSub event.Subscription) {
if stateChannel == nil {
log.Error("State state is nil")
return
}
if err := s.waitForSync(stateChannel, stateSub); err != nil {
log.WithError(err)
return
}
state, err := s.config.HeadFetcher.HeadState(s.ctx)
if err != nil {
log.WithError(err).Error("Could not get head state")
return
}
if state == nil {
log.Error("Head state is nil")
return
}
epoch := slots.ToEpoch(state.Slot())
log.WithField("Epoch", epoch).Info("Synced to head epoch, starting reporting performance")
s.Lock()
s.initializePerformanceStructures(state, epoch)
s.Unlock()
s.updateSyncCommitteeTrackedVals(state)
s.Lock()
s.isLogging = true
s.Unlock()
s.monitorRoutine(stateChannel, stateSub)
}
// initializePerformanceStructures initializes the validatorLatestPerformance
// and validatorAggregatedPerformance for each tracked validator.
func (s *Service) initializePerformanceStructures(state state.BeaconState, epoch types.Epoch) {
for idx := range s.TrackedValidators {
balance, err := state.BalanceAtIndex(idx)
if err != nil {
log.WithError(err).WithField("ValidatorIndex", idx).Error(
"Could not fetch starting balance, skipping aggregated logs.")
balance = 0
}
s.aggregatedPerformance[idx] = ValidatorAggregatedPerformance{
startEpoch: epoch,
startBalance: balance,
}
s.latestPerformance[idx] = ValidatorLatestPerformance{
balance: balance,
}
}
}
// Status retrieves the status of the service.
func (s *Service) Status() error {
if s.isLogging {
return nil
}
return errors.New("not running")
}
// Stop stops the service.
func (s *Service) Stop() error {
defer s.cancel()
s.isLogging = false
return nil
}
// waitForSync waits until the beacon node is synced to the latest head.
func (s *Service) waitForSync(stateChannel chan *feed.Event, stateSub event.Subscription) error {
for {
select {
case event := <-stateChannel:
if event.Type == statefeed.Synced {
_, ok := event.Data.(*statefeed.SyncedData)
if !ok {
return errNotSyncedData
}
return nil
}
case <-s.ctx.Done():
log.Debug("Context closed, exiting goroutine")
return errContextClosedWhileWaiting
case err := <-stateSub.Err():
log.WithError(err).Error("Could not subscribe to state notifier")
return err
}
}
}
// monitorRoutine is the main dispatcher, it registers event channels for the
// state feed and the operation feed. It then calls the appropriate function
// when we get messages after syncing a block or processing attestations/sync
// committee contributions.
func (s *Service) monitorRoutine(stateChannel chan *feed.Event, stateSub event.Subscription) {
defer stateSub.Unsubscribe()
opChannel := make(chan *feed.Event, 1)
opSub := s.config.AttestationNotifier.OperationFeed().Subscribe(opChannel)
defer opSub.Unsubscribe()
for {
select {
case event := <-stateChannel:
if event.Type == statefeed.BlockProcessed {
data, ok := event.Data.(*statefeed.BlockProcessedData)
if !ok {
log.Error("Event feed data is not of type *statefeed.BlockProcessedData")
} else if data.Verified {
// We only process blocks that have been verified
s.processBlock(s.ctx, data.SignedBlock)
}
}
case event := <-opChannel:
switch event.Type {
case operation.UnaggregatedAttReceived:
data, ok := event.Data.(*operation.UnAggregatedAttReceivedData)
if !ok {
log.Error("Event feed data is not of type *operation.UnAggregatedAttReceivedData")
} else {
s.processUnaggregatedAttestation(s.ctx, data.Attestation)
}
case operation.AggregatedAttReceived:
data, ok := event.Data.(*operation.AggregatedAttReceivedData)
if !ok {
log.Error("Event feed data is not of type *operation.AggregatedAttReceivedData")
} else {
s.processAggregatedAttestation(s.ctx, data.Attestation)
}
case operation.ExitReceived:
data, ok := event.Data.(*operation.ExitReceivedData)
if !ok {
log.Error("Event feed data is not of type *operation.ExitReceivedData")
} else {
s.processExit(data.Exit)
}
case operation.SyncCommitteeContributionReceived:
data, ok := event.Data.(*operation.SyncCommitteeContributionReceivedData)
if !ok {
log.Error("Event feed data is not of type *operation.SyncCommitteeContributionReceivedData")
} else {
s.processSyncCommitteeContribution(data.Contribution)
}
}
case <-s.ctx.Done():
log.Debug("Context closed, exiting goroutine")
return
case err := <-stateSub.Err():
log.WithError(err).Error("Could not subscribe to state notifier")
return
}
}
}
// TrackedIndex returns if the given validator index corresponds to one of the // TrackedIndex returns if the given validator index corresponds to one of the
// validators we follow. // validators we follow.
// It assumes the caller holds the service Lock // It assumes the caller holds the service Lock

View File

@@ -1,29 +1,30 @@
package monitor package monitor
import ( import (
"context"
"fmt"
"sync"
"testing" "testing"
"time"
types "github.com/prysmaticlabs/eth2-types" types "github.com/prysmaticlabs/eth2-types"
mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/core/altair"
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state"
testDB "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/state/stategen"
"github.com/prysmaticlabs/prysm/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/wrapper"
"github.com/prysmaticlabs/prysm/testing/require" "github.com/prysmaticlabs/prysm/testing/require"
"github.com/prysmaticlabs/prysm/testing/util" "github.com/prysmaticlabs/prysm/testing/util"
"github.com/prysmaticlabs/prysm/time/slots"
logTest "github.com/sirupsen/logrus/hooks/test" logTest "github.com/sirupsen/logrus/hooks/test"
) )
func TestTrackedIndex(t *testing.T) { func setupService(t *testing.T) *Service {
s := &Service{ beaconDB := testDB.SetupDB(t)
TrackedValidators: map[types.ValidatorIndex]interface{}{ state, _ := util.DeterministicGenesisStateAltair(t, 256)
1: nil,
2: nil,
},
}
require.Equal(t, s.trackedIndex(types.ValidatorIndex(1)), true)
require.Equal(t, s.trackedIndex(types.ValidatorIndex(3)), false)
}
func TestUpdateSyncCommitteeTrackedVals(t *testing.T) {
hook := logTest.NewGlobal()
s := setupService(t)
state, _ := util.DeterministicGenesisStateAltair(t, 1024)
pubKeys := make([][]byte, 3) pubKeys := make([][]byte, 3)
pubKeys[0] = state.Validators()[0].PublicKey pubKeys[0] = state.Validators()[0].PublicKey
@@ -35,6 +36,77 @@ func TestUpdateSyncCommitteeTrackedVals(t *testing.T) {
}) })
require.NoError(t, state.SetCurrentSyncCommittee(currentSyncCommittee)) require.NoError(t, state.SetCurrentSyncCommittee(currentSyncCommittee))
chainService := &mock.ChainService{
Genesis: time.Now(),
DB: beaconDB,
State: state,
Root: []byte("hello-world"),
ValidatorsRoot: [32]byte{},
}
trackedVals := map[types.ValidatorIndex]bool{
1: true,
2: true,
12: true,
15: true,
}
latestPerformance := map[types.ValidatorIndex]ValidatorLatestPerformance{
1: {
balance: 32000000000,
},
2: {
balance: 32000000000,
},
12: {
balance: 31900000000,
},
15: {
balance: 31900000000,
},
}
aggregatedPerformance := map[types.ValidatorIndex]ValidatorAggregatedPerformance{
1: {},
2: {},
12: {},
15: {},
}
trackedSyncCommitteeIndices := map[types.ValidatorIndex][]types.CommitteeIndex{
1: {0, 1, 2, 3},
12: {4, 5},
}
return &Service{
config: &ValidatorMonitorConfig{
StateGen: stategen.New(beaconDB),
StateNotifier: chainService.StateNotifier(),
HeadFetcher: chainService,
AttestationNotifier: chainService.OperationNotifier(),
},
ctx: context.Background(),
TrackedValidators: trackedVals,
latestPerformance: latestPerformance,
aggregatedPerformance: aggregatedPerformance,
trackedSyncCommitteeIndices: trackedSyncCommitteeIndices,
lastSyncedEpoch: 0,
}
}
func TestTrackedIndex(t *testing.T) {
s := &Service{
TrackedValidators: map[types.ValidatorIndex]bool{
1: true,
2: true,
},
}
require.Equal(t, s.trackedIndex(types.ValidatorIndex(1)), true)
require.Equal(t, s.trackedIndex(types.ValidatorIndex(3)), false)
}
func TestUpdateSyncCommitteeTrackedVals(t *testing.T) {
hook := logTest.NewGlobal()
s := setupService(t)
state, _ := util.DeterministicGenesisStateAltair(t, 1024)
s.updateSyncCommitteeTrackedVals(state) s.updateSyncCommitteeTrackedVals(state)
require.LogsDoNotContain(t, hook, "Sync committee assignments will not be reported") require.LogsDoNotContain(t, hook, "Sync committee assignments will not be reported")
newTrackedSyncIndices := map[types.ValidatorIndex][]types.CommitteeIndex{ newTrackedSyncIndices := map[types.ValidatorIndex][]types.CommitteeIndex{
@@ -43,3 +115,186 @@ func TestUpdateSyncCommitteeTrackedVals(t *testing.T) {
} }
require.DeepEqual(t, s.trackedSyncCommitteeIndices, newTrackedSyncIndices) require.DeepEqual(t, s.trackedSyncCommitteeIndices, newTrackedSyncIndices)
} }
func TestNewService(t *testing.T) {
config := &ValidatorMonitorConfig{}
tracked := []types.ValidatorIndex{}
ctx := context.Background()
_, err := NewService(ctx, config, tracked)
require.NoError(t, err)
}
func TestStart(t *testing.T) {
hook := logTest.NewGlobal()
s := setupService(t)
stateChannel := make(chan *feed.Event, 1)
stateSub := s.config.StateNotifier.StateFeed().Subscribe(stateChannel)
defer stateSub.Unsubscribe()
wg := &sync.WaitGroup{}
wg.Add(1)
s.Start()
go func() {
select {
case stateEvent := <-stateChannel:
if stateEvent.Type == statefeed.Synced {
_, ok := stateEvent.Data.(*statefeed.SyncedData)
require.Equal(t, true, ok, "Event feed data is not type *statefeed.SyncedData")
}
case <-s.ctx.Done():
}
wg.Done()
}()
for sent := 0; sent == 0; {
sent = s.config.StateNotifier.StateFeed().Send(&feed.Event{
Type: statefeed.Synced,
Data: &statefeed.SyncedData{
StartTime: time.Now(),
},
})
}
// wait for Logrus
time.Sleep(1000 * time.Millisecond)
require.LogsContain(t, hook, "Synced to head epoch, starting reporting performance")
require.LogsContain(t, hook, "\"Starting service\" ValidatorIndices=\"[1 2 12 15]\"")
require.Equal(t, s.isLogging, true, "monitor is not running")
}
func TestInitializePerformanceStructures(t *testing.T) {
hook := logTest.NewGlobal()
ctx := context.Background()
s := setupService(t)
state, err := s.config.HeadFetcher.HeadState(ctx)
require.NoError(t, err)
epoch := slots.ToEpoch(state.Slot())
s.initializePerformanceStructures(state, epoch)
require.LogsDoNotContain(t, hook, "Could not fetch starting balance")
latestPerformance := map[types.ValidatorIndex]ValidatorLatestPerformance{
1: {
balance: 32000000000,
},
2: {
balance: 32000000000,
},
12: {
balance: 32000000000,
},
15: {
balance: 32000000000,
},
}
aggregatedPerformance := map[types.ValidatorIndex]ValidatorAggregatedPerformance{
1: {
startBalance: 32000000000,
},
2: {
startBalance: 32000000000,
},
12: {
startBalance: 32000000000,
},
15: {
startBalance: 32000000000,
},
}
require.DeepEqual(t, s.latestPerformance, latestPerformance)
require.DeepEqual(t, s.aggregatedPerformance, aggregatedPerformance)
}
func TestMonitorRoutine(t *testing.T) {
ctx := context.Background()
hook := logTest.NewGlobal()
s := setupService(t)
stateChannel := make(chan *feed.Event, 1)
stateSub := s.config.StateNotifier.StateFeed().Subscribe(stateChannel)
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
s.monitorRoutine(stateChannel, stateSub)
wg.Done()
}()
genesis, keys := util.DeterministicGenesisStateAltair(t, 64)
c, err := altair.NextSyncCommittee(ctx, genesis)
require.NoError(t, err)
require.NoError(t, genesis.SetCurrentSyncCommittee(c))
genConfig := util.DefaultBlockGenConfig()
block, err := util.GenerateFullBlockAltair(genesis, keys, genConfig, 1)
require.NoError(t, err)
root, err := block.GetBlock().HashTreeRoot()
require.NoError(t, err)
require.NoError(t, s.config.StateGen.SaveState(ctx, root, genesis))
wrapped, err := wrapper.WrappedAltairSignedBeaconBlock(block)
require.NoError(t, err)
stateChannel <- &feed.Event{
Type: statefeed.BlockProcessed,
Data: &statefeed.BlockProcessedData{
Slot: 1,
Verified: true,
SignedBlock: wrapped,
},
}
// Wait for Logrus
time.Sleep(1000 * time.Millisecond)
wanted1 := fmt.Sprintf("\"Proposed block was included\" BalanceChange=100000000 BlockRoot=%#x NewBalance=32000000000 ParentRoot=0xf732eaeb7fae ProposerIndex=15 Slot=1 Version=1 prefix=monitor", bytesutil.Trunc(root[:]))
require.LogsContain(t, hook, wanted1)
}
func TestWaitForSync(t *testing.T) {
s := setupService(t)
stateChannel := make(chan *feed.Event, 1)
stateSub := s.config.StateNotifier.StateFeed().Subscribe(stateChannel)
defer stateSub.Unsubscribe()
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
err := s.waitForSync(stateChannel, stateSub)
require.NoError(t, err)
wg.Done()
}()
stateChannel <- &feed.Event{
Type: statefeed.Synced,
Data: &statefeed.SyncedData{
StartTime: time.Now(),
},
}
}
func TestRun(t *testing.T) {
hook := logTest.NewGlobal()
s := setupService(t)
stateChannel := make(chan *feed.Event, 1)
stateSub := s.config.StateNotifier.StateFeed().Subscribe(stateChannel)
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
s.run(stateChannel, stateSub)
wg.Done()
}()
stateChannel <- &feed.Event{
Type: statefeed.Synced,
Data: &statefeed.SyncedData{
StartTime: time.Now(),
},
}
//wait for Logrus
time.Sleep(1000 * time.Millisecond)
require.LogsContain(t, hook, "Synced to head epoch, starting reporting performance")
}

View File

@@ -26,6 +26,7 @@ go_library(
"//beacon-chain/forkchoice:go_default_library", "//beacon-chain/forkchoice:go_default_library",
"//beacon-chain/forkchoice/protoarray:go_default_library", "//beacon-chain/forkchoice/protoarray:go_default_library",
"//beacon-chain/gateway:go_default_library", "//beacon-chain/gateway:go_default_library",
"//beacon-chain/monitor:go_default_library",
"//beacon-chain/node/registration:go_default_library", "//beacon-chain/node/registration:go_default_library",
"//beacon-chain/operations/attestations:go_default_library", "//beacon-chain/operations/attestations:go_default_library",
"//beacon-chain/operations/slashings:go_default_library", "//beacon-chain/operations/slashings:go_default_library",

View File

@@ -17,6 +17,7 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/pkg/errors" "github.com/pkg/errors"
types "github.com/prysmaticlabs/eth2-types"
apigateway "github.com/prysmaticlabs/prysm/api/gateway" apigateway "github.com/prysmaticlabs/prysm/api/gateway"
"github.com/prysmaticlabs/prysm/async/event" "github.com/prysmaticlabs/prysm/async/event"
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain" "github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
@@ -28,6 +29,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/forkchoice" "github.com/prysmaticlabs/prysm/beacon-chain/forkchoice"
"github.com/prysmaticlabs/prysm/beacon-chain/forkchoice/protoarray" "github.com/prysmaticlabs/prysm/beacon-chain/forkchoice/protoarray"
"github.com/prysmaticlabs/prysm/beacon-chain/gateway" "github.com/prysmaticlabs/prysm/beacon-chain/gateway"
"github.com/prysmaticlabs/prysm/beacon-chain/monitor"
"github.com/prysmaticlabs/prysm/beacon-chain/node/registration" "github.com/prysmaticlabs/prysm/beacon-chain/node/registration"
"github.com/prysmaticlabs/prysm/beacon-chain/operations/attestations" "github.com/prysmaticlabs/prysm/beacon-chain/operations/attestations"
"github.com/prysmaticlabs/prysm/beacon-chain/operations/slashings" "github.com/prysmaticlabs/prysm/beacon-chain/operations/slashings"
@@ -205,6 +207,10 @@ func New(cliCtx *cli.Context, opts ...Option) (*BeaconNode, error) {
return nil, err return nil, err
} }
if err := beacon.registerValidatorMonitorService(); err != nil {
return nil, err
}
if !cliCtx.Bool(cmd.DisableMonitoringFlag.Name) { if !cliCtx.Bool(cmd.DisableMonitoringFlag.Name) {
if err := beacon.registerPrometheusService(cliCtx); err != nil { if err := beacon.registerPrometheusService(cliCtx); err != nil {
return nil, err return nil, err
@@ -866,3 +872,33 @@ func (b *BeaconNode) registerDeterminsticGenesisService() error {
} }
return nil return nil
} }
func (b *BeaconNode) registerValidatorMonitorService() error {
if cmd.ValidatorMonitorIndicesFlag.Value == nil {
return nil
}
cliSlice := cmd.ValidatorMonitorIndicesFlag.Value.Value()
if cliSlice == nil {
return nil
}
tracked := make([]types.ValidatorIndex, len(cliSlice))
for i := range tracked {
tracked[i] = types.ValidatorIndex(cliSlice[i])
}
var chainService *blockchain.Service
if err := b.services.FetchService(&chainService); err != nil {
return err
}
monitorConfig := &monitor.ValidatorMonitorConfig{
StateNotifier: b,
AttestationNotifier: b,
StateGen: b.stateGen,
HeadFetcher: chainService,
}
svc, err := monitor.NewService(b.ctx, monitorConfig, tracked)
if err != nil {
return err
}
return b.services.RegisterService(svc)
}

View File

@@ -118,6 +118,7 @@ var appFlags = []cli.Flag{
cmd.RestoreSourceFileFlag, cmd.RestoreSourceFileFlag,
cmd.RestoreTargetDirFlag, cmd.RestoreTargetDirFlag,
cmd.BoltMMapInitialSizeFlag, cmd.BoltMMapInitialSizeFlag,
cmd.ValidatorMonitorIndicesFlag,
} }
func init() { func init() {

View File

@@ -73,6 +73,7 @@ var appHelpFlagGroups = []flagGroup{
cmd.RestoreSourceFileFlag, cmd.RestoreSourceFileFlag,
cmd.RestoreTargetDirFlag, cmd.RestoreTargetDirFlag,
cmd.BoltMMapInitialSizeFlag, cmd.BoltMMapInitialSizeFlag,
cmd.ValidatorMonitorIndicesFlag,
}, },
}, },
{ {

View File

@@ -224,6 +224,13 @@ var (
Name: "accept-terms-of-use", Name: "accept-terms-of-use",
Usage: "Accept Terms and Conditions (for non-interactive environments)", Usage: "Accept Terms and Conditions (for non-interactive environments)",
} }
// ValidatorMonitorIndicesFlag specifies a list of validator indices to
// track for performance updates
ValidatorMonitorIndicesFlag = &cli.IntSliceFlag{
Name: "monitor-indices",
Usage: "List of validator indices to track performance",
}
// RestoreSourceFileFlag specifies the filepath to the backed-up database file // RestoreSourceFileFlag specifies the filepath to the backed-up database file
// which will be used to restore the database. // which will be used to restore the database.
RestoreSourceFileFlag = &cli.StringFlag{ RestoreSourceFileFlag = &cli.StringFlag{

View File

@@ -33,6 +33,8 @@ func WrapFlags(flags []cli.Flag) []cli.Flag {
case *cli.Int64Flag: case *cli.Int64Flag:
// Int64Flag does not work. See https://github.com/prysmaticlabs/prysm/issues/6478 // Int64Flag does not work. See https://github.com/prysmaticlabs/prysm/issues/6478
panic(fmt.Sprintf("unsupported flag type type %T", f)) panic(fmt.Sprintf("unsupported flag type type %T", f))
case *cli.IntSliceFlag:
f = altsrc.NewIntSliceFlag(t)
default: default:
panic(fmt.Sprintf("cannot convert type %T", f)) panic(fmt.Sprintf("cannot convert type %T", f))
} }