changes for safe validator shutdown and restarts on healthcheck (#15401)

* poc changes for safe validator shutdown

* simplifying health routine and adding safe shutdown after max restarts reached

* fixing health tests

* fixing tests

* changelog

* gofmt

* fixing runner

* improve how runner times out

* improvements to ux on logs

* linting

* adding in max healthcheck flag

* changelog

* Update james-prysm_safe-validator-shutdown.md

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* Update validator/client/runner.go

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* Update validator/client/service.go

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* Update validator/client/runner.go

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* Update validator/client/runner.go

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* addressing some feedback from radek

* addressing some more feedback

* fixing name based on feedback

* fixing mistake on max health checks

* conflict accidently checked in

* go 1.23 no longer needs you to stop for the ticker

* Update flags.go

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* wip no unit test for recursive healthy host find

* rework healthcheck

* gaz

* fixing bugs and improving logs with new monitor

* removing health tracker, fixing runner tests, and adding placeholder for monitor tests

* fixing event stream check

* linting

* adding in health monitor tests

* gaz

* improving test

* removing some log.fatals

* forgot to remove comment

* missed fatal removal

* doppleganger should exit the node safely now

* Update validator/client/health_monitor.go

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* radek review

* Update validator/client/validator.go

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* Update validator/client/validator.go

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* Update validator/client/health_monitor.go

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* Update validator/client/health_monitor.go

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* Update validator/client/health_monitor.go

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* Update validator/client/validator.go

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* radek feedback

* read up on more suggestions and making fixes to channel

* suggested updates after more reading

* reverting some of this because it froze the validator after healthcheck failed

* fully reverting

* some improvements I found during testing

* Update cmd/validator/flags/flags.go

Co-authored-by: Preston Van Loon <pvanloon@offchainlabs.com>

* preston's feedback

* clarifications on changelog

* converted to using an event feed instead of my own channel publishing implementation, adding relevant logs

* preston log suggestion

---------

Co-authored-by: Radosław Kapka <rkapka@wp.pl>
Co-authored-by: Preston Van Loon <pvanloon@offchainlabs.com>
This commit is contained in:
james-prysm
2025-07-09 10:39:06 -05:00
committed by GitHub
parent 7025e50a6c
commit f2d57f0b5f
31 changed files with 1154 additions and 539 deletions

View File

@@ -1,20 +0,0 @@
load("@prysm//tools/go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"health.go",
"interfaces.go",
"mock.go",
],
importpath = "github.com/OffchainLabs/prysm/v6/api/client/beacon/health",
visibility = ["//visibility:public"],
deps = ["@org_uber_go_mock//gomock:go_default_library"],
)
go_test(
name = "go_default_test",
srcs = ["health_test.go"],
embed = [":go_default_library"],
deps = ["@org_uber_go_mock//gomock:go_default_library"],
)

View File

@@ -1,58 +0,0 @@
package health
import (
"context"
"sync"
)
type NodeHealthTracker struct {
isHealthy *bool
healthChan chan bool
node Node
sync.RWMutex
}
func NewTracker(node Node) Tracker {
return &NodeHealthTracker{
node: node,
healthChan: make(chan bool, 1),
}
}
// HealthUpdates provides a read-only channel for health updates.
func (n *NodeHealthTracker) HealthUpdates() <-chan bool {
return n.healthChan
}
func (n *NodeHealthTracker) IsHealthy(_ context.Context) bool {
n.RLock()
defer n.RUnlock()
if n.isHealthy == nil {
return false
}
return *n.isHealthy
}
func (n *NodeHealthTracker) CheckHealth(ctx context.Context) bool {
n.Lock()
defer n.Unlock()
newStatus := n.node.IsHealthy(ctx)
if n.isHealthy == nil {
n.isHealthy = &newStatus
}
isStatusChanged := newStatus != *n.isHealthy
if isStatusChanged {
// Update the health status
n.isHealthy = &newStatus
// Send the new status to the health channel, potentially overwriting the existing value
select {
case <-n.healthChan:
n.healthChan <- newStatus
default:
n.healthChan <- newStatus
}
}
return newStatus
}

View File

@@ -1,110 +0,0 @@
package health
import (
"sync"
"testing"
"go.uber.org/mock/gomock"
)
func TestNodeHealth_IsHealthy(t *testing.T) {
tests := []struct {
name string
isHealthy bool
want bool
}{
{"initially healthy", true, true},
{"initially unhealthy", false, false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
n := &NodeHealthTracker{
isHealthy: &tt.isHealthy,
healthChan: make(chan bool, 1),
}
if got := n.IsHealthy(t.Context()); got != tt.want {
t.Errorf("IsHealthy() = %v, want %v", got, tt.want)
}
})
}
}
func TestNodeHealth_UpdateNodeHealth(t *testing.T) {
tests := []struct {
name string
initial bool // Initial health status
newStatus bool // Status to update to
shouldSend bool // Should a message be sent through the channel
}{
{"healthy to unhealthy", true, false, true},
{"unhealthy to healthy", false, true, true},
{"remain healthy", true, true, false},
{"remain unhealthy", false, false, false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
client := NewMockHealthClient(ctrl)
client.EXPECT().IsHealthy(gomock.Any()).Return(tt.newStatus)
n := &NodeHealthTracker{
isHealthy: &tt.initial,
node: client,
healthChan: make(chan bool, 1),
}
s := n.CheckHealth(t.Context())
// Check if health status was updated
if s != tt.newStatus {
t.Errorf("UpdateNodeHealth() failed to update isHealthy from %v to %v", tt.initial, tt.newStatus)
}
select {
case status := <-n.HealthUpdates():
if !tt.shouldSend {
t.Errorf("UpdateNodeHealth() unexpectedly sent status %v to HealthCh", status)
} else if status != tt.newStatus {
t.Errorf("UpdateNodeHealth() sent wrong status %v, want %v", status, tt.newStatus)
}
default:
if tt.shouldSend {
t.Error("UpdateNodeHealth() did not send any status to HealthCh when expected")
}
}
})
}
}
func TestNodeHealth_Concurrency(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
client := NewMockHealthClient(ctrl)
n := NewTracker(client)
var wg sync.WaitGroup
// Number of goroutines to spawn for both reading and writing
numGoroutines := 6
wg.Add(numGoroutines * 2) // for readers and writers
// Concurrently update health status
for i := 0; i < numGoroutines; i++ {
go func() {
defer wg.Done()
client.EXPECT().IsHealthy(gomock.Any()).Return(false).Times(1)
n.CheckHealth(t.Context())
client.EXPECT().IsHealthy(gomock.Any()).Return(true).Times(1)
n.CheckHealth(t.Context())
}()
}
// Concurrently read health status
for i := 0; i < numGoroutines; i++ {
go func() {
defer wg.Done()
_ = n.IsHealthy(t.Context()) // Just read the value
}()
}
wg.Wait() // Wait for all goroutines to finish
}

View File

@@ -1,13 +0,0 @@
package health
import "context"
type Tracker interface {
HealthUpdates() <-chan bool
CheckHealth(ctx context.Context) bool
Node
}
type Node interface {
IsHealthy(ctx context.Context) bool
}

View File

@@ -1,58 +0,0 @@
package health
import (
"context"
"reflect"
"sync"
"go.uber.org/mock/gomock"
)
var (
_ = Node(&MockHealthClient{})
)
// MockHealthClient is a mock of HealthClient interface.
type MockHealthClient struct {
ctrl *gomock.Controller
recorder *MockHealthClientMockRecorder
sync.Mutex
}
// MockHealthClientMockRecorder is the mock recorder for MockHealthClient.
type MockHealthClientMockRecorder struct {
mock *MockHealthClient
}
// IsHealthy mocks base method.
func (m *MockHealthClient) IsHealthy(arg0 context.Context) bool {
m.Lock()
defer m.Unlock()
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "IsHealthy", arg0)
ret0, ok := ret[0].(bool)
if !ok {
return false
}
return ret0
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockHealthClient) EXPECT() *MockHealthClientMockRecorder {
return m.recorder
}
// IsHealthy indicates an expected call of IsHealthy.
func (mr *MockHealthClientMockRecorder) IsHealthy(arg0 any) *gomock.Call {
mr.mock.Lock()
defer mr.mock.Unlock()
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsHealthy", reflect.TypeOf((*MockHealthClient)(nil).IsHealthy), arg0)
}
// NewMockHealthClient creates a new mock instance.
func NewMockHealthClient(ctrl *gomock.Controller) *MockHealthClient {
mock := &MockHealthClient{ctrl: ctrl}
mock.recorder = &MockHealthClientMockRecorder{mock}
return mock
}

View File

@@ -0,0 +1,11 @@
## Added
- Added `max-health-checks` flag that sets the maximum times the validator tries to check the health of the beacon node before timing out. 0 or a negative number is indefinite. (the default is 0)
## Fixed
- Validator client shuts down cleanly on error instead of fatal error.
## Changed
- Previously, we optimistically believed the beacon node was healthy and tried to get chain start, but now we do a health check at the start.

View File

@@ -19,6 +19,8 @@ const (
WalletDefaultDirName = "prysm-wallet-v2"
// DefaultHTTPServerHost for the validator client.
DefaultHTTPServerHost = "127.0.0.1"
DefaultMaxHealthChecks = 0
)
var (
@@ -394,6 +396,13 @@ var (
Usage: "Disables polling of duties on dependent root changes.",
Value: false,
}
// MaxHealthChecksFlag sets a maximum amount of times to check for beacon node health before validator client times out and shuts down
MaxHealthChecksFlag = &cli.IntFlag{
Name: "max-health-checks",
Usage: "Maximum number of health checks to perform before exiting if not healthy. Set to 0 or a negative number for indefinite checks.",
Value: DefaultMaxHealthChecks,
}
)
// DefaultValidatorDir returns OS-specific default validator directory.

View File

@@ -76,6 +76,7 @@ var appFlags = []cli.Flag{
flags.EnableDistributed,
flags.AuthTokenPathFlag,
flags.DisableDutiesPolling,
flags.MaxHealthChecksFlag,
// Consensys' Web3Signer flags
flags.Web3SignerURLFlag,
flags.Web3SignerPublicValidatorKeysFlag,

View File

@@ -141,6 +141,7 @@ var appHelpFlagGroups = []flagGroup{
flags.EnableDistributed,
flags.AuthTokenPathFlag,
flags.DisableDutiesPolling,
flags.MaxHealthChecksFlag,
},
},
{

View File

@@ -425,10 +425,7 @@ func (r *testRunner) testDoppelGangerProtection(ctx context.Context) error {
if r.t.Failed() {
return errors.New("doppelganger was unable to be found")
}
// Expect an abrupt exit for the validator client.
if err := g.Wait(); err == nil || !strings.Contains(err.Error(), errGeneralCode) {
return fmt.Errorf("wanted an error of %s but received %v", errGeneralCode, err)
}
require.NoError(r.t, g.Wait())
return nil
}

View File

@@ -9,16 +9,18 @@ go_library(
"node_client_mock.go",
"prysm_chain_client_mock.go",
"validator_client_mock.go",
"validator_mock.go",
],
importpath = "github.com/OffchainLabs/prysm/v6/testing/validator-mock",
visibility = ["//visibility:public"],
deps = [
"//api/client/beacon/health:go_default_library",
"//api/client/event:go_default_library",
"//config/proposer:go_default_library",
"//consensus-types/primitives:go_default_library",
"//consensus-types/validator:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//validator/client/iface:go_default_library",
"//validator/keymanager:go_default_library",
"@org_golang_google_protobuf//types/known/emptypb:go_default_library",
"@org_uber_go_mock//gomock:go_default_library",
],

View File

@@ -13,7 +13,6 @@ import (
context "context"
reflect "reflect"
health "github.com/OffchainLabs/prysm/v6/api/client/beacon/health"
eth "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
gomock "go.uber.org/mock/gomock"
emptypb "google.golang.org/protobuf/types/known/emptypb"
@@ -23,7 +22,6 @@ import (
type MockNodeClient struct {
ctrl *gomock.Controller
recorder *MockNodeClientMockRecorder
isgomock struct{}
}
// MockNodeClientMockRecorder is the mock recorder for MockNodeClient.
@@ -44,75 +42,75 @@ func (m *MockNodeClient) EXPECT() *MockNodeClientMockRecorder {
}
// Genesis mocks base method.
func (m *MockNodeClient) Genesis(ctx context.Context, in *emptypb.Empty) (*eth.Genesis, error) {
func (m *MockNodeClient) Genesis(arg0 context.Context, arg1 *emptypb.Empty) (*eth.Genesis, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Genesis", ctx, in)
ret := m.ctrl.Call(m, "Genesis", arg0, arg1)
ret0, _ := ret[0].(*eth.Genesis)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// Genesis indicates an expected call of Genesis.
func (mr *MockNodeClientMockRecorder) Genesis(ctx, in any) *gomock.Call {
func (mr *MockNodeClientMockRecorder) Genesis(arg0, arg1 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Genesis", reflect.TypeOf((*MockNodeClient)(nil).Genesis), ctx, in)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Genesis", reflect.TypeOf((*MockNodeClient)(nil).Genesis), arg0, arg1)
}
// HealthTracker mocks base method.
func (m *MockNodeClient) HealthTracker() health.Tracker {
// IsHealthy mocks base method.
func (m *MockNodeClient) IsHealthy(arg0 context.Context) bool {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "HealthTracker")
ret0, _ := ret[0].(health.Tracker)
ret := m.ctrl.Call(m, "IsHealthy", arg0)
ret0, _ := ret[0].(bool)
return ret0
}
// HealthTracker indicates an expected call of HealthTracker.
func (mr *MockNodeClientMockRecorder) HealthTracker() *gomock.Call {
// IsHealthy indicates an expected call of IsHealthy.
func (mr *MockNodeClientMockRecorder) IsHealthy(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HealthTracker", reflect.TypeOf((*MockNodeClient)(nil).HealthTracker))
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsHealthy", reflect.TypeOf((*MockNodeClient)(nil).IsHealthy), arg0)
}
// Peers mocks base method.
func (m *MockNodeClient) Peers(ctx context.Context, in *emptypb.Empty) (*eth.Peers, error) {
func (m *MockNodeClient) Peers(arg0 context.Context, arg1 *emptypb.Empty) (*eth.Peers, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Peers", ctx, in)
ret := m.ctrl.Call(m, "Peers", arg0, arg1)
ret0, _ := ret[0].(*eth.Peers)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// Peers indicates an expected call of Peers.
func (mr *MockNodeClientMockRecorder) Peers(ctx, in any) *gomock.Call {
func (mr *MockNodeClientMockRecorder) Peers(arg0, arg1 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Peers", reflect.TypeOf((*MockNodeClient)(nil).Peers), ctx, in)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Peers", reflect.TypeOf((*MockNodeClient)(nil).Peers), arg0, arg1)
}
// SyncStatus mocks base method.
func (m *MockNodeClient) SyncStatus(ctx context.Context, in *emptypb.Empty) (*eth.SyncStatus, error) {
func (m *MockNodeClient) SyncStatus(arg0 context.Context, arg1 *emptypb.Empty) (*eth.SyncStatus, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "SyncStatus", ctx, in)
ret := m.ctrl.Call(m, "SyncStatus", arg0, arg1)
ret0, _ := ret[0].(*eth.SyncStatus)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// SyncStatus indicates an expected call of SyncStatus.
func (mr *MockNodeClientMockRecorder) SyncStatus(ctx, in any) *gomock.Call {
func (mr *MockNodeClientMockRecorder) SyncStatus(arg0, arg1 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SyncStatus", reflect.TypeOf((*MockNodeClient)(nil).SyncStatus), ctx, in)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SyncStatus", reflect.TypeOf((*MockNodeClient)(nil).SyncStatus), arg0, arg1)
}
// Version mocks base method.
func (m *MockNodeClient) Version(ctx context.Context, in *emptypb.Empty) (*eth.Version, error) {
func (m *MockNodeClient) Version(arg0 context.Context, arg1 *emptypb.Empty) (*eth.Version, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Version", ctx, in)
ret := m.ctrl.Call(m, "Version", arg0, arg1)
ret0, _ := ret[0].(*eth.Version)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// Version indicates an expected call of Version.
func (mr *MockNodeClientMockRecorder) Version(ctx, in any) *gomock.Call {
func (mr *MockNodeClientMockRecorder) Version(arg0, arg1 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Version", reflect.TypeOf((*MockNodeClient)(nil).Version), ctx, in)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Version", reflect.TypeOf((*MockNodeClient)(nil).Version), arg0, arg1)
}

536
testing/validator-mock/validator_mock.go generated Normal file
View File

@@ -0,0 +1,536 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: github.com/OffchainLabs/prysm/v6/validator/client/iface (interfaces: Validator)
//
// Generated by this command:
//
// mockgen -package=validator_mock -destination=testing/validator-mock/validator_mock.go github.com/OffchainLabs/prysm/v6/validator/client/iface Validator
//
// Package validator_mock is a generated GoMock package.
package validator_mock
import (
context "context"
reflect "reflect"
time "time"
event "github.com/OffchainLabs/prysm/v6/api/client/event"
proposer "github.com/OffchainLabs/prysm/v6/config/proposer"
primitives "github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
eth "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
iface "github.com/OffchainLabs/prysm/v6/validator/client/iface"
keymanager "github.com/OffchainLabs/prysm/v6/validator/keymanager"
gomock "go.uber.org/mock/gomock"
)
// MockValidator is a mock of Validator interface.
type MockValidator struct {
ctrl *gomock.Controller
recorder *MockValidatorMockRecorder
}
// MockValidatorMockRecorder is the mock recorder for MockValidator.
type MockValidatorMockRecorder struct {
mock *MockValidator
}
// NewMockValidator creates a new mock instance.
func NewMockValidator(ctrl *gomock.Controller) *MockValidator {
mock := &MockValidator{ctrl: ctrl}
mock.recorder = &MockValidatorMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockValidator) EXPECT() *MockValidatorMockRecorder {
return m.recorder
}
// AccountsChangedChan mocks base method.
func (m *MockValidator) AccountsChangedChan() <-chan [][48]byte {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "AccountsChangedChan")
ret0, _ := ret[0].(<-chan [][48]byte)
return ret0
}
// AccountsChangedChan indicates an expected call of AccountsChangedChan.
func (mr *MockValidatorMockRecorder) AccountsChangedChan() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AccountsChangedChan", reflect.TypeOf((*MockValidator)(nil).AccountsChangedChan))
}
// CanonicalHeadSlot mocks base method.
func (m *MockValidator) CanonicalHeadSlot(arg0 context.Context) (primitives.Slot, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "CanonicalHeadSlot", arg0)
ret0, _ := ret[0].(primitives.Slot)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// CanonicalHeadSlot indicates an expected call of CanonicalHeadSlot.
func (mr *MockValidatorMockRecorder) CanonicalHeadSlot(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CanonicalHeadSlot", reflect.TypeOf((*MockValidator)(nil).CanonicalHeadSlot), arg0)
}
// CheckDoppelGanger mocks base method.
func (m *MockValidator) CheckDoppelGanger(arg0 context.Context) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "CheckDoppelGanger", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// CheckDoppelGanger indicates an expected call of CheckDoppelGanger.
func (mr *MockValidatorMockRecorder) CheckDoppelGanger(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CheckDoppelGanger", reflect.TypeOf((*MockValidator)(nil).CheckDoppelGanger), arg0)
}
// DeleteGraffiti mocks base method.
func (m *MockValidator) DeleteGraffiti(arg0 context.Context, arg1 [48]byte) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "DeleteGraffiti", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// DeleteGraffiti indicates an expected call of DeleteGraffiti.
func (mr *MockValidatorMockRecorder) DeleteGraffiti(arg0, arg1 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteGraffiti", reflect.TypeOf((*MockValidator)(nil).DeleteGraffiti), arg0, arg1)
}
// Done mocks base method.
func (m *MockValidator) Done() {
m.ctrl.T.Helper()
m.ctrl.Call(m, "Done")
}
// Done indicates an expected call of Done.
func (mr *MockValidatorMockRecorder) Done() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Done", reflect.TypeOf((*MockValidator)(nil).Done))
}
// EventStreamIsRunning mocks base method.
func (m *MockValidator) EventStreamIsRunning() bool {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "EventStreamIsRunning")
ret0, _ := ret[0].(bool)
return ret0
}
// EventStreamIsRunning indicates an expected call of EventStreamIsRunning.
func (mr *MockValidatorMockRecorder) EventStreamIsRunning() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EventStreamIsRunning", reflect.TypeOf((*MockValidator)(nil).EventStreamIsRunning))
}
// EventsChan mocks base method.
func (m *MockValidator) EventsChan() <-chan *event.Event {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "EventsChan")
ret0, _ := ret[0].(<-chan *event.Event)
return ret0
}
// EventsChan indicates an expected call of EventsChan.
func (mr *MockValidatorMockRecorder) EventsChan() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EventsChan", reflect.TypeOf((*MockValidator)(nil).EventsChan))
}
// FindHealthyHost mocks base method.
func (m *MockValidator) FindHealthyHost(arg0 context.Context) bool {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "FindHealthyHost", arg0)
ret0, _ := ret[0].(bool)
return ret0
}
// FindHealthyHost indicates an expected call of FindHealthyHost.
func (mr *MockValidatorMockRecorder) FindHealthyHost(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindHealthyHost", reflect.TypeOf((*MockValidator)(nil).FindHealthyHost), arg0)
}
// Graffiti mocks base method.
func (m *MockValidator) Graffiti(arg0 context.Context, arg1 [48]byte) ([]byte, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Graffiti", arg0, arg1)
ret0, _ := ret[0].([]byte)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// Graffiti indicates an expected call of Graffiti.
func (mr *MockValidatorMockRecorder) Graffiti(arg0, arg1 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Graffiti", reflect.TypeOf((*MockValidator)(nil).Graffiti), arg0, arg1)
}
// HandleKeyReload mocks base method.
func (m *MockValidator) HandleKeyReload(arg0 context.Context, arg1 [][48]byte) (bool, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "HandleKeyReload", arg0, arg1)
ret0, _ := ret[0].(bool)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// HandleKeyReload indicates an expected call of HandleKeyReload.
func (mr *MockValidatorMockRecorder) HandleKeyReload(arg0, arg1 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HandleKeyReload", reflect.TypeOf((*MockValidator)(nil).HandleKeyReload), arg0, arg1)
}
// Host mocks base method.
func (m *MockValidator) Host() string {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Host")
ret0, _ := ret[0].(string)
return ret0
}
// Host indicates an expected call of Host.
func (mr *MockValidatorMockRecorder) Host() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Host", reflect.TypeOf((*MockValidator)(nil).Host))
}
// Keymanager mocks base method.
func (m *MockValidator) Keymanager() (keymanager.IKeymanager, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Keymanager")
ret0, _ := ret[0].(keymanager.IKeymanager)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// Keymanager indicates an expected call of Keymanager.
func (mr *MockValidatorMockRecorder) Keymanager() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Keymanager", reflect.TypeOf((*MockValidator)(nil).Keymanager))
}
// LogSubmittedAtts mocks base method.
func (m *MockValidator) LogSubmittedAtts(arg0 primitives.Slot) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "LogSubmittedAtts", arg0)
}
// LogSubmittedAtts indicates an expected call of LogSubmittedAtts.
func (mr *MockValidatorMockRecorder) LogSubmittedAtts(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LogSubmittedAtts", reflect.TypeOf((*MockValidator)(nil).LogSubmittedAtts), arg0)
}
// LogSubmittedSyncCommitteeMessages mocks base method.
func (m *MockValidator) LogSubmittedSyncCommitteeMessages() {
m.ctrl.T.Helper()
m.ctrl.Call(m, "LogSubmittedSyncCommitteeMessages")
}
// LogSubmittedSyncCommitteeMessages indicates an expected call of LogSubmittedSyncCommitteeMessages.
func (mr *MockValidatorMockRecorder) LogSubmittedSyncCommitteeMessages() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LogSubmittedSyncCommitteeMessages", reflect.TypeOf((*MockValidator)(nil).LogSubmittedSyncCommitteeMessages))
}
// LogValidatorGainsAndLosses mocks base method.
func (m *MockValidator) LogValidatorGainsAndLosses(arg0 context.Context, arg1 primitives.Slot) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "LogValidatorGainsAndLosses", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// LogValidatorGainsAndLosses indicates an expected call of LogValidatorGainsAndLosses.
func (mr *MockValidatorMockRecorder) LogValidatorGainsAndLosses(arg0, arg1 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LogValidatorGainsAndLosses", reflect.TypeOf((*MockValidator)(nil).LogValidatorGainsAndLosses), arg0, arg1)
}
// NextSlot mocks base method.
func (m *MockValidator) NextSlot() <-chan primitives.Slot {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "NextSlot")
ret0, _ := ret[0].(<-chan primitives.Slot)
return ret0
}
// NextSlot indicates an expected call of NextSlot.
func (mr *MockValidatorMockRecorder) NextSlot() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NextSlot", reflect.TypeOf((*MockValidator)(nil).NextSlot))
}
// ProcessEvent mocks base method.
func (m *MockValidator) ProcessEvent(arg0 context.Context, arg1 *event.Event) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "ProcessEvent", arg0, arg1)
}
// ProcessEvent indicates an expected call of ProcessEvent.
func (mr *MockValidatorMockRecorder) ProcessEvent(arg0, arg1 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProcessEvent", reflect.TypeOf((*MockValidator)(nil).ProcessEvent), arg0, arg1)
}
// ProposeBlock mocks base method.
func (m *MockValidator) ProposeBlock(arg0 context.Context, arg1 primitives.Slot, arg2 [48]byte) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "ProposeBlock", arg0, arg1, arg2)
}
// ProposeBlock indicates an expected call of ProposeBlock.
func (mr *MockValidatorMockRecorder) ProposeBlock(arg0, arg1, arg2 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProposeBlock", reflect.TypeOf((*MockValidator)(nil).ProposeBlock), arg0, arg1, arg2)
}
// ProposerSettings mocks base method.
func (m *MockValidator) ProposerSettings() *proposer.Settings {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ProposerSettings")
ret0, _ := ret[0].(*proposer.Settings)
return ret0
}
// ProposerSettings indicates an expected call of ProposerSettings.
func (mr *MockValidatorMockRecorder) ProposerSettings() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProposerSettings", reflect.TypeOf((*MockValidator)(nil).ProposerSettings))
}
// PushProposerSettings mocks base method.
func (m *MockValidator) PushProposerSettings(arg0 context.Context, arg1 primitives.Slot, arg2 bool) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "PushProposerSettings", arg0, arg1, arg2)
ret0, _ := ret[0].(error)
return ret0
}
// PushProposerSettings indicates an expected call of PushProposerSettings.
func (mr *MockValidatorMockRecorder) PushProposerSettings(arg0, arg1, arg2 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PushProposerSettings", reflect.TypeOf((*MockValidator)(nil).PushProposerSettings), arg0, arg1, arg2)
}
// RolesAt mocks base method.
func (m *MockValidator) RolesAt(arg0 context.Context, arg1 primitives.Slot) (map[[48]byte][]iface.ValidatorRole, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "RolesAt", arg0, arg1)
ret0, _ := ret[0].(map[[48]byte][]iface.ValidatorRole)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// RolesAt indicates an expected call of RolesAt.
func (mr *MockValidatorMockRecorder) RolesAt(arg0, arg1 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RolesAt", reflect.TypeOf((*MockValidator)(nil).RolesAt), arg0, arg1)
}
// SetGraffiti mocks base method.
func (m *MockValidator) SetGraffiti(arg0 context.Context, arg1 [48]byte, arg2 []byte) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "SetGraffiti", arg0, arg1, arg2)
ret0, _ := ret[0].(error)
return ret0
}
// SetGraffiti indicates an expected call of SetGraffiti.
func (mr *MockValidatorMockRecorder) SetGraffiti(arg0, arg1, arg2 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetGraffiti", reflect.TypeOf((*MockValidator)(nil).SetGraffiti), arg0, arg1, arg2)
}
// SetProposerSettings mocks base method.
func (m *MockValidator) SetProposerSettings(arg0 context.Context, arg1 *proposer.Settings) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "SetProposerSettings", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// SetProposerSettings indicates an expected call of SetProposerSettings.
func (mr *MockValidatorMockRecorder) SetProposerSettings(arg0, arg1 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetProposerSettings", reflect.TypeOf((*MockValidator)(nil).SetProposerSettings), arg0, arg1)
}
// SignValidatorRegistrationRequest mocks base method.
func (m *MockValidator) SignValidatorRegistrationRequest(arg0 context.Context, arg1 iface.SigningFunc, arg2 *eth.ValidatorRegistrationV1) (*eth.SignedValidatorRegistrationV1, bool, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "SignValidatorRegistrationRequest", arg0, arg1, arg2)
ret0, _ := ret[0].(*eth.SignedValidatorRegistrationV1)
ret1, _ := ret[1].(bool)
ret2, _ := ret[2].(error)
return ret0, ret1, ret2
}
// SignValidatorRegistrationRequest indicates an expected call of SignValidatorRegistrationRequest.
func (mr *MockValidatorMockRecorder) SignValidatorRegistrationRequest(arg0, arg1, arg2 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SignValidatorRegistrationRequest", reflect.TypeOf((*MockValidator)(nil).SignValidatorRegistrationRequest), arg0, arg1, arg2)
}
// SlotDeadline mocks base method.
func (m *MockValidator) SlotDeadline(arg0 primitives.Slot) time.Time {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "SlotDeadline", arg0)
ret0, _ := ret[0].(time.Time)
return ret0
}
// SlotDeadline indicates an expected call of SlotDeadline.
func (mr *MockValidatorMockRecorder) SlotDeadline(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SlotDeadline", reflect.TypeOf((*MockValidator)(nil).SlotDeadline), arg0)
}
// StartEventStream mocks base method.
func (m *MockValidator) StartEventStream(arg0 context.Context, arg1 []string) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "StartEventStream", arg0, arg1)
}
// StartEventStream indicates an expected call of StartEventStream.
func (mr *MockValidatorMockRecorder) StartEventStream(arg0, arg1 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StartEventStream", reflect.TypeOf((*MockValidator)(nil).StartEventStream), arg0, arg1)
}
// SubmitAggregateAndProof mocks base method.
func (m *MockValidator) SubmitAggregateAndProof(arg0 context.Context, arg1 primitives.Slot, arg2 [48]byte) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "SubmitAggregateAndProof", arg0, arg1, arg2)
}
// SubmitAggregateAndProof indicates an expected call of SubmitAggregateAndProof.
func (mr *MockValidatorMockRecorder) SubmitAggregateAndProof(arg0, arg1, arg2 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubmitAggregateAndProof", reflect.TypeOf((*MockValidator)(nil).SubmitAggregateAndProof), arg0, arg1, arg2)
}
// SubmitAttestation mocks base method.
func (m *MockValidator) SubmitAttestation(arg0 context.Context, arg1 primitives.Slot, arg2 [48]byte) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "SubmitAttestation", arg0, arg1, arg2)
}
// SubmitAttestation indicates an expected call of SubmitAttestation.
func (mr *MockValidatorMockRecorder) SubmitAttestation(arg0, arg1, arg2 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubmitAttestation", reflect.TypeOf((*MockValidator)(nil).SubmitAttestation), arg0, arg1, arg2)
}
// SubmitSignedContributionAndProof mocks base method.
func (m *MockValidator) SubmitSignedContributionAndProof(arg0 context.Context, arg1 primitives.Slot, arg2 [48]byte) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "SubmitSignedContributionAndProof", arg0, arg1, arg2)
}
// SubmitSignedContributionAndProof indicates an expected call of SubmitSignedContributionAndProof.
func (mr *MockValidatorMockRecorder) SubmitSignedContributionAndProof(arg0, arg1, arg2 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubmitSignedContributionAndProof", reflect.TypeOf((*MockValidator)(nil).SubmitSignedContributionAndProof), arg0, arg1, arg2)
}
// SubmitSyncCommitteeMessage mocks base method.
func (m *MockValidator) SubmitSyncCommitteeMessage(arg0 context.Context, arg1 primitives.Slot, arg2 [48]byte) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "SubmitSyncCommitteeMessage", arg0, arg1, arg2)
}
// SubmitSyncCommitteeMessage indicates an expected call of SubmitSyncCommitteeMessage.
func (mr *MockValidatorMockRecorder) SubmitSyncCommitteeMessage(arg0, arg1, arg2 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubmitSyncCommitteeMessage", reflect.TypeOf((*MockValidator)(nil).SubmitSyncCommitteeMessage), arg0, arg1, arg2)
}
// UpdateDomainDataCaches mocks base method.
func (m *MockValidator) UpdateDomainDataCaches(arg0 context.Context, arg1 primitives.Slot) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "UpdateDomainDataCaches", arg0, arg1)
}
// UpdateDomainDataCaches indicates an expected call of UpdateDomainDataCaches.
func (mr *MockValidatorMockRecorder) UpdateDomainDataCaches(arg0, arg1 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateDomainDataCaches", reflect.TypeOf((*MockValidator)(nil).UpdateDomainDataCaches), arg0, arg1)
}
// UpdateDuties mocks base method.
func (m *MockValidator) UpdateDuties(arg0 context.Context) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "UpdateDuties", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// UpdateDuties indicates an expected call of UpdateDuties.
func (mr *MockValidatorMockRecorder) UpdateDuties(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateDuties", reflect.TypeOf((*MockValidator)(nil).UpdateDuties), arg0)
}
// WaitForActivation mocks base method.
func (m *MockValidator) WaitForActivation(arg0 context.Context) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "WaitForActivation", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// WaitForActivation indicates an expected call of WaitForActivation.
func (mr *MockValidatorMockRecorder) WaitForActivation(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WaitForActivation", reflect.TypeOf((*MockValidator)(nil).WaitForActivation), arg0)
}
// WaitForChainStart mocks base method.
func (m *MockValidator) WaitForChainStart(arg0 context.Context) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "WaitForChainStart", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// WaitForChainStart indicates an expected call of WaitForChainStart.
func (mr *MockValidatorMockRecorder) WaitForChainStart(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WaitForChainStart", reflect.TypeOf((*MockValidator)(nil).WaitForChainStart), arg0)
}
// WaitForKeymanagerInitialization mocks base method.
func (m *MockValidator) WaitForKeymanagerInitialization(arg0 context.Context) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "WaitForKeymanagerInitialization", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// WaitForKeymanagerInitialization indicates an expected call of WaitForKeymanagerInitialization.
func (mr *MockValidatorMockRecorder) WaitForKeymanagerInitialization(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WaitForKeymanagerInitialization", reflect.TypeOf((*MockValidator)(nil).WaitForKeymanagerInitialization), arg0)
}
// WaitForSync mocks base method.
func (m *MockValidator) WaitForSync(arg0 context.Context) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "WaitForSync", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// WaitForSync indicates an expected call of WaitForSync.
func (mr *MockValidatorMockRecorder) WaitForSync(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WaitForSync", reflect.TypeOf((*MockValidator)(nil).WaitForSync), arg0)
}

View File

@@ -5,6 +5,7 @@ go_library(
srcs = [
"aggregate.go",
"attest.go",
"health_monitor.go",
"key_reload.go",
"log.go",
"metrics.go",
@@ -24,7 +25,6 @@ go_library(
],
deps = [
"//api/client:go_default_library",
"//api/client/beacon/health:go_default_library",
"//api/client/event:go_default_library",
"//api/grpc:go_default_library",
"//api/server/structs:go_default_library",
@@ -104,6 +104,7 @@ go_test(
srcs = [
"aggregate_test.go",
"attest_test.go",
"health_monitor_test.go",
"key_reload_test.go",
"log_test.go",
"metrics_test.go",
@@ -121,7 +122,6 @@ go_test(
],
embed = [":go_default_library"],
deps = [
"//api/client/beacon/health:go_default_library",
"//api/server/structs:go_default_library",
"//async/event:go_default_library",
"//beacon-chain/core/signing:go_default_library",
@@ -169,6 +169,8 @@ go_test(
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
"@com_github_stretchr_testify//assert:go_default_library",
"@com_github_stretchr_testify//require:go_default_library",
"@com_github_tyler_smith_go_bip39//:go_default_library",
"@com_github_urfave_cli_v2//:go_default_library",
"@com_github_wealdtech_go_eth2_util//:go_default_library",

View File

@@ -42,7 +42,6 @@ go_library(
deps = [
"//api:go_default_library",
"//api/apiutil:go_default_library",
"//api/client/beacon/health:go_default_library",
"//api/client/event:go_default_library",
"//api/server/structs:go_default_library",
"//beacon-chain/core/helpers:go_default_library",

View File

@@ -4,7 +4,6 @@ import (
"context"
"strconv"
"github.com/OffchainLabs/prysm/v6/api/client/beacon/health"
"github.com/OffchainLabs/prysm/v6/api/server/structs"
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v6/validator/client/iface"
@@ -22,7 +21,6 @@ type beaconApiNodeClient struct {
fallbackClient iface.NodeClient
jsonRestHandler RestHandler
genesisProvider GenesisProvider
healthTracker health.Tracker
}
func (c *beaconApiNodeClient) SyncStatus(ctx context.Context, _ *empty.Empty) (*ethpb.SyncStatus, error) {
@@ -104,11 +102,11 @@ func (c *beaconApiNodeClient) Peers(ctx context.Context, in *empty.Empty) (*ethp
}
func (c *beaconApiNodeClient) IsHealthy(ctx context.Context) bool {
return c.jsonRestHandler.Get(ctx, "/eth/v1/node/health", nil) == nil
}
func (c *beaconApiNodeClient) HealthTracker() health.Tracker {
return c.healthTracker
if err := c.jsonRestHandler.Get(ctx, "/eth/v1/node/health", nil); err != nil {
log.WithError(err).Error("failed to get health of node")
return false
}
return true
}
func NewNodeClientWithFallback(jsonRestHandler RestHandler, fallbackClient iface.NodeClient) iface.NodeClient {
@@ -117,6 +115,5 @@ func NewNodeClientWithFallback(jsonRestHandler RestHandler, fallbackClient iface
fallbackClient: fallbackClient,
genesisProvider: &beaconApiGenesisProvider{jsonRestHandler: jsonRestHandler},
}
b.healthTracker = health.NewTracker(b)
return b
}

View File

@@ -12,7 +12,6 @@ go_library(
visibility = ["//validator:__subpackages__"],
deps = [
"//api/client:go_default_library",
"//api/client/beacon/health:go_default_library",
"//api/client/event:go_default_library",
"//api/server/structs:go_default_library",
"//beacon-chain/rpc/eth/helpers:go_default_library",

View File

@@ -3,7 +3,6 @@ package grpc_api
import (
"context"
"github.com/OffchainLabs/prysm/v6/api/client/beacon/health"
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v6/validator/client/iface"
"github.com/golang/protobuf/ptypes/empty"
@@ -16,8 +15,7 @@ var (
)
type grpcNodeClient struct {
nodeClient ethpb.NodeClient
healthTracker health.Tracker
nodeClient ethpb.NodeClient
}
func (c *grpcNodeClient) SyncStatus(ctx context.Context, in *empty.Empty) (*ethpb.SyncStatus, error) {
@@ -39,18 +37,13 @@ func (c *grpcNodeClient) Peers(ctx context.Context, in *empty.Empty) (*ethpb.Pee
func (c *grpcNodeClient) IsHealthy(ctx context.Context) bool {
_, err := c.nodeClient.GetHealth(ctx, &ethpb.HealthRequest{})
if err != nil {
log.WithError(err).Debug("failed to get health of node")
log.WithError(err).Error("failed to get health of node")
return false
}
return true
}
func (c *grpcNodeClient) HealthTracker() health.Tracker {
return c.healthTracker
}
func NewNodeClient(cc grpc.ClientConnInterface) iface.NodeClient {
g := &grpcNodeClient{nodeClient: ethpb.NewNodeClient(cc)}
g.healthTracker = health.NewTracker(g)
return g
}

View File

@@ -0,0 +1,107 @@
package client
import (
"context"
"sync"
"time"
"github.com/OffchainLabs/prysm/v6/async/event"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/validator/client/iface"
"github.com/sirupsen/logrus"
)
type healthMonitor struct {
ctx context.Context
cancel context.CancelFunc
v iface.Validator
maxFails int
healthyCh chan bool // emits true → healthy, false → unhealthy
healthEventFeed *event.Feed
fails int
isHealthy bool
sync.RWMutex
}
// newHealthMonitor
func newHealthMonitor(
parentCtx context.Context,
parentCancel context.CancelFunc,
maxFails int,
v iface.Validator,
) *healthMonitor {
m := &healthMonitor{
ctx: parentCtx,
cancel: parentCancel,
maxFails: maxFails,
v: v,
healthyCh: make(chan bool),
healthEventFeed: new(event.Feed),
}
m.healthEventFeed.Subscribe(m.healthyCh)
return m
}
func (m *healthMonitor) IsHealthy() bool {
m.RLock()
defer m.RUnlock()
return m.isHealthy
}
func (m *healthMonitor) performHealthCheck() {
ishealthy := m.v.FindHealthyHost(m.ctx)
m.Lock()
defer m.Unlock()
if ishealthy {
m.fails = 0
} else if m.maxFails > 0 && m.fails < m.maxFails {
log.WithFields(logrus.Fields{
"fails": m.fails,
"maxFails": m.maxFails,
}).Warn("Failed health check, beacon node is unresponsive")
m.fails++
} else if m.maxFails > 0 && m.fails >= m.maxFails {
log.WithField("maxFails", m.maxFails).Warn("Maximum health checks reached. Stopping health check routine")
m.isHealthy = ishealthy
m.cancel()
return
}
if ishealthy == m.isHealthy {
// is not a new status so skip update
log.WithField("isHealthy", m.isHealthy).Debug("Health status did not change")
return
}
log.WithFields(logrus.Fields{
"healthy": ishealthy,
"previously": m.isHealthy,
}).Info("Health status changed")
m.isHealthy = ishealthy
go m.healthEventFeed.Send(ishealthy) // non blocking send
}
func (m *healthMonitor) loop() {
log.Debug("Starting health check routine for beacon node apis")
interval := time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second
ticker := time.NewTicker(interval)
for ; true; <-ticker.C { // check immediately
if m.ctx.Err() != nil {
log.Debug("Context canceled, stopping health checking")
return
}
m.performHealthCheck()
}
}
// Start launches the monitor loop (non-blocking).
func (m *healthMonitor) Start() {
go m.loop()
}
// Stop terminates the monitor and closes its channel.
func (m *healthMonitor) Stop() {
m.cancel()
}
// HealthyChan exposes liveness updates; the channel closes when Stop() is called.
func (m *healthMonitor) HealthyChan() <-chan bool { return m.healthyCh }

View File

@@ -0,0 +1,255 @@
package client
import (
"context"
"sync"
"testing"
"time"
"github.com/OffchainLabs/prysm/v6/async/event"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"github.com/OffchainLabs/prysm/v6/config/params"
validatormock "github.com/OffchainLabs/prysm/v6/testing/validator-mock"
)
// TestHealthMonitor_IsHealthy_Concurrency tests thread-safety of IsHealthy.
func TestHealthMonitor_IsHealthy_Concurrency(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockValidator := validatormock.NewMockValidator(ctrl)
// inside the test
parentCtx, parentCancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
t.Cleanup(parentCancel)
// Expectation for newHealthMonitor's FindHealthyHost call
mockValidator.EXPECT().FindHealthyHost(gomock.Any()).Return(true).Times(1)
monitor := newHealthMonitor(parentCtx, parentCancel, 3, mockValidator)
require.NotNil(t, monitor)
monitor.Start()
time.Sleep(100 * time.Millisecond)
var wg sync.WaitGroup
numGoroutines := 10
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
assert.True(t, monitor.IsHealthy())
}()
}
wg.Wait()
// Test when isHealthy is false
monitor.Lock()
monitor.isHealthy = false
monitor.Unlock()
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
assert.False(t, monitor.IsHealthy())
}()
}
wg.Wait()
}
// TestHealthMonitor_PerformHealthCheck tests the core logic of a single health check.
func TestHealthMonitor_PerformHealthCheck(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockValidator := validatormock.NewMockValidator(ctrl)
tests := []struct {
expectStatusUpdate bool // true if healthyCh should receive a new, different status
expectCancelCalled bool
expectedIsHealthy bool
findHealthyHostReturns bool
initialIsHealthy bool
expectedFails int
maxFails int
initialFails int
name string
}{
{
name: "Becomes Unhealthy",
initialIsHealthy: true,
initialFails: 0,
maxFails: 3,
findHealthyHostReturns: false,
expectedIsHealthy: false,
expectedFails: 1,
expectCancelCalled: false,
expectStatusUpdate: true,
},
{
name: "Becomes Healthy",
initialIsHealthy: false,
initialFails: 1,
maxFails: 3,
findHealthyHostReturns: true,
expectedIsHealthy: true,
expectedFails: 0,
expectCancelCalled: false,
expectStatusUpdate: true,
},
{
name: "Remains Healthy",
initialIsHealthy: true,
initialFails: 0,
maxFails: 3,
findHealthyHostReturns: true,
expectedIsHealthy: true,
expectedFails: 0,
expectCancelCalled: false,
expectStatusUpdate: false, // Status did not change
},
{
name: "Remains Unhealthy",
initialIsHealthy: false,
initialFails: 1,
maxFails: 3,
findHealthyHostReturns: false,
expectedIsHealthy: false,
expectedFails: 2,
expectCancelCalled: false,
expectStatusUpdate: false, // Status did not change
},
{
name: "Max Fails Reached - Stays Unhealthy and Cancels",
initialIsHealthy: false,
initialFails: 2, // One fail away from maxFails
maxFails: 2,
findHealthyHostReturns: false,
expectedIsHealthy: false,
expectedFails: 2,
expectCancelCalled: true,
expectStatusUpdate: false, // Status was already false, no new update sent before cancel
},
{
name: "MaxFails is 0 - Remains Unhealthy, No Cancel",
initialIsHealthy: false,
initialFails: 100, // Arbitrarily high
maxFails: 0, // Infinite
findHealthyHostReturns: false,
expectedIsHealthy: false,
expectedFails: 100,
expectCancelCalled: false,
expectStatusUpdate: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
monitorCtx, monitorCancelFunc := context.WithCancel(context.Background())
var actualCancelFuncCalled bool
testCancelCallback := func() {
actualCancelFuncCalled = true
monitorCancelFunc() // Propagate to monitorCtx if needed for other parts
}
monitor := &healthMonitor{
ctx: monitorCtx, // Context for the monitor's operations
cancel: testCancelCallback, // This is m.cancel()
v: mockValidator,
maxFails: tt.maxFails,
healthyCh: make(chan bool, 1),
fails: tt.initialFails,
isHealthy: tt.initialIsHealthy,
healthEventFeed: new(event.Feed),
}
monitor.healthEventFeed.Subscribe(monitor.healthyCh)
mockValidator.EXPECT().FindHealthyHost(gomock.Any()).Return(tt.findHealthyHostReturns)
monitor.performHealthCheck()
assert.Equal(t, tt.expectedIsHealthy, monitor.IsHealthy(), "isHealthy mismatch")
assert.Equal(t, tt.expectedFails, monitor.fails, "fails count mismatch")
assert.Equal(t, tt.expectCancelCalled, actualCancelFuncCalled, "cancelCalled mismatch")
if tt.expectStatusUpdate {
assert.Eventually(t, func() bool {
select {
case s := <-monitor.HealthyChan():
return s == tt.expectedIsHealthy
default:
return false
}
}, 100*time.Millisecond, 10*time.Millisecond) // wait, poll
} else {
assert.Never(t, func() bool {
select {
case <-monitor.HealthyChan():
return true // received something: fail
default:
return false
}
}, 100*time.Millisecond, 10*time.Millisecond)
}
if !actualCancelFuncCalled {
monitorCancelFunc() // Clean up context if not cancelled by test logic
}
})
}
}
// TestHealthMonitor_HealthyChan_ReceivesUpdates tests channel behavior.
func TestHealthMonitor_HealthyChan_ReceivesUpdates(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockValidator := validatormock.NewMockValidator(ctrl)
monitorCtx, monitorCancelFunc := context.WithCancel(context.Background())
originalSecPerSlot := params.BeaconConfig().SecondsPerSlot
params.BeaconConfig().SecondsPerSlot = 1 // 1 sec interval for test
defer func() {
params.BeaconConfig().SecondsPerSlot = originalSecPerSlot
monitorCancelFunc() // Ensure monitor context is cleaned up
}()
monitor := newHealthMonitor(monitorCtx, monitorCancelFunc, 3, mockValidator)
require.NotNil(t, monitor)
ch := monitor.HealthyChan()
require.NotNil(t, ch)
first := mockValidator.EXPECT().
FindHealthyHost(gomock.Any()).
Return(true).Times(1)
mockValidator.EXPECT().
FindHealthyHost(gomock.Any()).
Return(false).
AnyTimes().
After(first)
monitor.Start()
// Consume initial prime value (true)
select {
case status := <-ch:
assert.True(t, status, "Expected initial status to be true")
case <-time.After(100 * time.Millisecond):
t.Fatal("Timeout waiting for initial status")
}
// Expect 'false' from the first check in Start's loop
select {
case status := <-ch:
assert.False(t, status, "Expected status to change to false")
case <-time.After(2 * time.Second): // Timeout for tick + processing
t.Fatal("Timeout waiting for status change to false")
}
// 4. Stop the monitor
monitor.Stop() // This calls monitorCancelFunc
}

View File

@@ -12,7 +12,6 @@ go_library(
importpath = "github.com/OffchainLabs/prysm/v6/validator/client/iface",
visibility = ["//visibility:public"],
deps = [
"//api/client/beacon/health:go_default_library",
"//api/client/event:go_default_library",
"//config/fieldparams:go_default_library",
"//config/proposer:go_default_library",

View File

@@ -3,7 +3,6 @@ package iface
import (
"context"
"github.com/OffchainLabs/prysm/v6/api/client/beacon/health"
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
"github.com/golang/protobuf/ptypes/empty"
)
@@ -13,5 +12,5 @@ type NodeClient interface {
Genesis(ctx context.Context, in *empty.Empty) (*ethpb.Genesis, error)
Version(ctx context.Context, in *empty.Empty) (*ethpb.Version, error)
Peers(ctx context.Context, in *empty.Empty) (*ethpb.Peers, error)
HealthTracker() health.Tracker
IsHealthy(ctx context.Context) bool
}

View File

@@ -4,7 +4,6 @@ import (
"context"
"time"
"github.com/OffchainLabs/prysm/v6/api/client/beacon/health"
"github.com/OffchainLabs/prysm/v6/api/client/event"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/config/proposer"
@@ -69,9 +68,8 @@ type Validator interface {
Graffiti(ctx context.Context, pubKey [fieldparams.BLSPubkeyLength]byte) ([]byte, error)
SetGraffiti(ctx context.Context, pubKey [fieldparams.BLSPubkeyLength]byte, graffiti []byte) error
DeleteGraffiti(ctx context.Context, pubKey [fieldparams.BLSPubkeyLength]byte) error
HealthTracker() health.Tracker
Host() string
ChangeHost()
FindHealthyHost(ctx context.Context) bool
}
// SigningFunc interface defines a type for the function that signs a message

View File

@@ -7,8 +7,6 @@ import (
"time"
"github.com/OffchainLabs/prysm/v6/api/client"
"github.com/OffchainLabs/prysm/v6/api/client/event"
"github.com/OffchainLabs/prysm/v6/config/features"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
@@ -25,24 +23,26 @@ import (
// Time to wait before trying to reconnect with beacon node.
var backOffPeriod = 10 * time.Second
// Run the main validator routine. This routine exits if the context is
// canceled.
// runner encapsulates the main validator routine.
type runner struct {
validator iface.Validator
healthMonitor *healthMonitor
}
// newRunner creates a new runner instance and performs all necessary initialization.
// This function can return an error if initialization fails.
//
// Order of operations:
// 1 - Initialize validator data
// 2 - Wait for validator activation
// 3 - Wait for the next slot start
// 4 - Update assignments
// 5 - Determine role at current slot
// 6 - Perform assigned role, if any
func run(ctx context.Context, v iface.Validator) {
cleanup := v.Done
defer cleanup()
func newRunner(ctx context.Context, v iface.Validator, monitor *healthMonitor) (*runner, error) {
// Initialize validator and get head slot
headSlot, err := initializeValidatorAndGetHeadSlot(ctx, v)
if err != nil {
return // Exit if context is canceled.
v.Done()
return nil, err
}
// Prepare initial duties update
ss, err := slots.EpochStart(slots.ToEpoch(headSlot + 1))
if err != nil {
log.WithError(err).Error("Failed to get epoch start")
@@ -51,11 +51,10 @@ func run(ctx context.Context, v iface.Validator) {
startDeadline := v.SlotDeadline(ss + params.BeaconConfig().SlotsPerEpoch - 1)
startCtx, startCancel := context.WithDeadline(ctx, startDeadline)
if err := v.UpdateDuties(startCtx); err != nil {
// Don't return error here, just log it
handleAssignmentError(err, headSlot)
}
startCancel()
healthTracker := v.HealthTracker()
runHealthCheckRoutine(ctx, v)
// check if proposer settings is still nil
// Set properties on the beacon node like the fee recipient for validators that are being used & active.
@@ -64,16 +63,38 @@ func run(ctx context.Context, v iface.Validator) {
" and will continue to use settings provided in the beacon node.")
}
if err := v.PushProposerSettings(ctx, headSlot, true); err != nil {
log.WithError(err).Fatal("Failed to update proposer settings")
v.Done()
return nil, errors.Wrap(err, "failed to update proposer settings")
}
return &runner{
validator: v,
healthMonitor: monitor,
}, nil
}
// run executes the main validator routine. This routine exits if the context is
// canceled. It returns a channel that will be closed when the routine exits.
//
// Order of operations:
// 1 - Wait for the next slot start
// 2 - Update assignments if needed
// 3 - Determine role at current slot
// 4 - Perform assigned role, if any
func (r *runner) run(ctx context.Context) {
v := r.validator
cleanup := v.Done
defer cleanup()
for {
select {
case <-ctx.Done():
log.Info("Context canceled, stopping validator")
return // Exit if context is canceled.
case slot := <-v.NextSlot():
if !healthTracker.IsHealthy(ctx) {
continue
if !r.healthMonitor.IsHealthy() {
log.Warn("Beacon node unhealthy, stopping runner")
return
}
deadline := v.SlotDeadline(slot)
@@ -126,27 +147,6 @@ func run(ctx context.Context, v iface.Validator) {
// performRoles calls span.End()
rolesCtx, _ := context.WithDeadline(ctx, deadline)
performRoles(rolesCtx, allRoles, v, slot, &wg, span)
case isHealthyAgain := <-healthTracker.HealthUpdates():
if isHealthyAgain {
headSlot, err = initializeValidatorAndGetHeadSlot(ctx, v)
if err != nil {
log.WithError(err).Error("Failed to re initialize validator and get head slot")
continue
}
ss, err := slots.EpochStart(slots.ToEpoch(headSlot + 1))
if err != nil {
log.WithError(err).Error("Failed to get epoch start")
continue
}
deadline := v.SlotDeadline(ss + params.BeaconConfig().SlotsPerEpoch - 1)
dutiesCtx, dutiesCancel := context.WithDeadline(ctx, deadline)
if err := v.UpdateDuties(dutiesCtx); err != nil {
handleAssignmentError(err, headSlot)
dutiesCancel()
continue
}
dutiesCancel()
}
case e := <-v.EventsChan():
v.ProcessEvent(ctx, e)
case currentKeys := <-v.AccountsChangedChan(): // should be less of a priority than next slot
@@ -203,13 +203,11 @@ func initializeValidatorAndGetHeadSlot(ctx context.Context, v iface.Validator) (
continue
}
log.WithError(err).Fatal("Could not determine if beacon chain started")
return 0, errors.Wrap(err, "could not determine if beacon chain started")
}
if err := v.WaitForKeymanagerInitialization(ctx); err != nil {
// log.Fatal will prevent defer from being called
v.Done()
log.WithError(err).Fatal("Wallet is not ready")
return 0, errors.Wrap(err, "Wallet is not ready")
}
if err := v.WaitForSync(ctx); err != nil {
@@ -218,21 +216,21 @@ func initializeValidatorAndGetHeadSlot(ctx context.Context, v iface.Validator) (
continue
}
log.WithError(err).Fatal("Could not determine if beacon node synced")
return 0, errors.Wrap(err, "could not determine if beacon node synced")
}
if err := v.WaitForActivation(ctx); err != nil {
log.WithError(err).Fatal("Could not wait for validator activation")
return 0, errors.Wrap(err, "could not wait for validator activation")
}
headSlot, err = v.CanonicalHeadSlot(ctx)
if isConnectionError(err) {
log.WithError(err).Warn("Could not get current canonical head slot")
log.WithError(err).Warn("could not get current canonical head slot")
continue
}
if err != nil {
log.WithError(err).Fatal("Could not get current canonical head slot")
return 0, errors.Wrap(err, "could not get current canonical head slot")
}
if err := v.CheckDoppelGanger(ctx); err != nil {
@@ -241,7 +239,7 @@ func initializeValidatorAndGetHeadSlot(ctx context.Context, v iface.Validator) (
continue
}
log.WithError(err).Fatal("Could not succeed with doppelganger check")
return 0, errors.Wrap(err, "could not succeed with doppelganger check")
}
break
}
@@ -309,40 +307,3 @@ func handleAssignmentError(err error, slot primitives.Slot) {
log.WithError(err).Error("Failed to update assignments")
}
}
func runHealthCheckRoutine(ctx context.Context, v iface.Validator) {
log.Info("Starting health check routine for beacon node apis")
healthCheckTicker := time.NewTicker(time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second)
tracker := v.HealthTracker()
go func() {
// trigger the healthcheck immediately the first time
for ; true; <-healthCheckTicker.C {
if ctx.Err() != nil {
log.WithError(ctx.Err()).Error("Context cancelled")
return
}
isHealthy := tracker.CheckHealth(ctx)
if !isHealthy && features.Get().EnableBeaconRESTApi {
v.ChangeHost()
if !tracker.CheckHealth(ctx) {
continue // Skip to the next ticker
}
slot, err := v.CanonicalHeadSlot(ctx)
if err != nil {
log.WithError(err).Error("Could not get canonical head slot")
return
}
if err := v.PushProposerSettings(ctx, slot, true); err != nil {
log.WithError(err).Warn("Failed to update proposer settings")
}
}
// in case of node returning healthy but event stream died
if isHealthy && !v.EventStreamIsRunning() {
log.Info("Event stream reconnecting...")
go v.StartEventStream(ctx, event.DefaultEventTopics)
}
}
}()
}

View File

@@ -10,7 +10,6 @@ import (
"testing"
"time"
"github.com/OffchainLabs/prysm/v6/api/client/beacon/health"
"github.com/OffchainLabs/prysm/v6/async/event"
"github.com/OffchainLabs/prysm/v6/cache/lru"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
@@ -42,47 +41,46 @@ func cancelledContext() context.Context {
return ctx
}
// Helper function to run the validator runner for tests
func runTest(t *testing.T, ctx context.Context, v iface.Validator) {
r, err := newRunner(ctx, v, &healthMonitor{isHealthy: true})
require.NoError(t, err)
r.run(ctx)
}
func TestCancelledContext_CleansUpValidator(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
node := health.NewMockHealthClient(ctrl)
tracker := health.NewTracker(node)
v := &testutil.FakeValidator{
Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}},
Tracker: tracker,
Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}},
}
run(cancelledContext(), v)
runTest(t, cancelledContext(), v)
assert.Equal(t, true, v.DoneCalled, "Expected Done() to be called")
}
func TestCancelledContext_WaitsForChainStart(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
node := health.NewMockHealthClient(ctrl)
tracker := health.NewTracker(node)
v := &testutil.FakeValidator{
Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}},
Tracker: tracker,
Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}},
}
run(cancelledContext(), v)
runTest(t, cancelledContext(), v)
assert.Equal(t, 1, v.WaitForChainStartCalled, "Expected WaitForChainStart() to be called")
}
func TestRetry_On_ConnectionError(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
node := health.NewMockHealthClient(ctrl)
tracker := health.NewTracker(node)
retry := 10
node.EXPECT().IsHealthy(gomock.Any()).Return(true)
v := &testutil.FakeValidator{
Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}},
Tracker: tracker,
RetryTillSuccess: retry,
}
backOffPeriod = 10 * time.Millisecond
ctx, cancel := context.WithCancel(t.Context())
go run(ctx, v)
go runTest(t, ctx, v)
// each step will fail (retry times)=10 this sleep times will wait more then
// the time it takes for all steps to succeed before main loop.
time.Sleep(time.Duration(retry*6) * backOffPeriod)
@@ -97,25 +95,17 @@ func TestRetry_On_ConnectionError(t *testing.T) {
func TestCancelledContext_WaitsForActivation(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
node := health.NewMockHealthClient(ctrl)
tracker := health.NewTracker(node)
v := &testutil.FakeValidator{
Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}},
Tracker: tracker,
Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}},
}
run(cancelledContext(), v)
runTest(t, cancelledContext(), v)
assert.Equal(t, 1, v.WaitForActivationCalled, "Expected WaitForActivation() to be called")
}
func TestUpdateDuties_NextSlot(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
node := health.NewMockHealthClient(ctrl)
tracker := health.NewTracker(node)
node.EXPECT().IsHealthy(gomock.Any()).Return(true).AnyTimes()
// avoid race condition between the cancellation of the context in the go stream from slot and the setting of IsHealthy
_ = tracker.CheckHealth(t.Context())
v := &testutil.FakeValidator{Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}, Tracker: tracker}
v := &testutil.FakeValidator{Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}}
ctx, cancel := context.WithCancel(t.Context())
slot := primitives.Slot(55)
@@ -127,7 +117,7 @@ func TestUpdateDuties_NextSlot(t *testing.T) {
cancel()
}()
run(ctx, v)
runTest(t, ctx, v)
require.Equal(t, true, v.UpdateDutiesCalled, "Expected UpdateAssignments(%d) to be called", slot)
}
@@ -136,12 +126,8 @@ func TestUpdateDuties_HandlesError(t *testing.T) {
hook := logTest.NewGlobal()
ctrl := gomock.NewController(t)
defer ctrl.Finish()
node := health.NewMockHealthClient(ctrl)
tracker := health.NewTracker(node)
node.EXPECT().IsHealthy(gomock.Any()).Return(true).AnyTimes()
// avoid race condition between the cancellation of the context in the go stream from slot and the setting of IsHealthy
_ = tracker.CheckHealth(t.Context())
v := &testutil.FakeValidator{Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}, Tracker: tracker}
v := &testutil.FakeValidator{Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}}
ctx, cancel := context.WithCancel(t.Context())
slot := primitives.Slot(55)
@@ -154,7 +140,7 @@ func TestUpdateDuties_HandlesError(t *testing.T) {
}()
v.UpdateDutiesRet = errors.New("bad")
run(ctx, v)
runTest(t, ctx, v)
require.LogsContain(t, hook, "Failed to update assignments")
}
@@ -162,12 +148,8 @@ func TestUpdateDuties_HandlesError(t *testing.T) {
func TestRoleAt_NextSlot(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
node := health.NewMockHealthClient(ctrl)
tracker := health.NewTracker(node)
node.EXPECT().IsHealthy(gomock.Any()).Return(true).AnyTimes()
// avoid race condition between the cancellation of the context in the go stream from slot and the setting of IsHealthy
_ = tracker.CheckHealth(t.Context())
v := &testutil.FakeValidator{Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}, Tracker: tracker}
v := &testutil.FakeValidator{Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}}
ctx, cancel := context.WithCancel(t.Context())
slot := primitives.Slot(55)
@@ -179,7 +161,7 @@ func TestRoleAt_NextSlot(t *testing.T) {
cancel()
}()
run(ctx, v)
runTest(t, ctx, v)
require.Equal(t, true, v.RoleAtCalled, "Expected RoleAt(%d) to be called", slot)
assert.Equal(t, uint64(slot), v.RoleAtArg1, "RoleAt called with the wrong arg")
@@ -188,13 +170,9 @@ func TestRoleAt_NextSlot(t *testing.T) {
func TestAttests_NextSlot(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
node := health.NewMockHealthClient(ctrl)
tracker := health.NewTracker(node)
node.EXPECT().IsHealthy(gomock.Any()).Return(true).AnyTimes()
// avoid race condition between the cancellation of the context in the go stream from slot and the setting of IsHealthy
_ = tracker.CheckHealth(t.Context())
attSubmitted := make(chan interface{})
v := &testutil.FakeValidator{Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}, Tracker: tracker, AttSubmitted: attSubmitted}
v := &testutil.FakeValidator{Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}, AttSubmitted: attSubmitted}
ctx, cancel := context.WithCancel(t.Context())
slot := primitives.Slot(55)
@@ -206,7 +184,7 @@ func TestAttests_NextSlot(t *testing.T) {
cancel()
}()
run(ctx, v)
runTest(t, ctx, v)
<-attSubmitted
require.Equal(t, true, v.AttestToBlockHeadCalled, "SubmitAttestation(%d) was not called", slot)
assert.Equal(t, uint64(slot), v.AttestToBlockHeadArg1, "SubmitAttestation was called with wrong arg")
@@ -215,13 +193,8 @@ func TestAttests_NextSlot(t *testing.T) {
func TestProposes_NextSlot(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
node := health.NewMockHealthClient(ctrl)
tracker := health.NewTracker(node)
node.EXPECT().IsHealthy(gomock.Any()).Return(true).AnyTimes()
// avoid race condition between the cancellation of the context in the go stream from slot and the setting of IsHealthy
_ = tracker.CheckHealth(t.Context())
blockProposed := make(chan interface{})
v := &testutil.FakeValidator{Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}, Tracker: tracker, BlockProposed: blockProposed}
v := &testutil.FakeValidator{Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}, BlockProposed: blockProposed}
ctx, cancel := context.WithCancel(t.Context())
slot := primitives.Slot(55)
@@ -233,7 +206,7 @@ func TestProposes_NextSlot(t *testing.T) {
cancel()
}()
run(ctx, v)
runTest(t, ctx, v)
<-blockProposed
require.Equal(t, true, v.ProposeBlockCalled, "ProposeBlock(%d) was not called", slot)
@@ -243,14 +216,10 @@ func TestProposes_NextSlot(t *testing.T) {
func TestBothProposesAndAttests_NextSlot(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
node := health.NewMockHealthClient(ctrl)
tracker := health.NewTracker(node)
node.EXPECT().IsHealthy(gomock.Any()).Return(true).AnyTimes()
// avoid race condition between the cancellation of the context in the go stream from slot and the setting of IsHealthy
_ = tracker.CheckHealth(t.Context())
blockProposed := make(chan interface{})
attSubmitted := make(chan interface{})
v := &testutil.FakeValidator{Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}, Tracker: tracker, BlockProposed: blockProposed, AttSubmitted: attSubmitted}
v := &testutil.FakeValidator{Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}, BlockProposed: blockProposed, AttSubmitted: attSubmitted}
ctx, cancel := context.WithCancel(t.Context())
slot := primitives.Slot(55)
@@ -262,7 +231,7 @@ func TestBothProposesAndAttests_NextSlot(t *testing.T) {
cancel()
}()
run(ctx, v)
runTest(t, ctx, v)
<-blockProposed
<-attSubmitted
require.Equal(t, true, v.AttestToBlockHeadCalled, "SubmitAttestation(%d) was not called", slot)
@@ -276,10 +245,8 @@ func TestKeyReload_ActiveKey(t *testing.T) {
km := &mockKeymanager{}
ctrl := gomock.NewController(t)
defer ctrl.Finish()
node := health.NewMockHealthClient(ctrl)
tracker := health.NewTracker(node)
node.EXPECT().IsHealthy(gomock.Any()).Return(true).AnyTimes()
v := &testutil.FakeValidator{Km: km, Tracker: tracker, AccountsChannel: make(chan [][fieldparams.BLSPubkeyLength]byte)}
v := &testutil.FakeValidator{Km: km, AccountsChannel: make(chan [][fieldparams.BLSPubkeyLength]byte)}
current := [][fieldparams.BLSPubkeyLength]byte{testutil.ActiveKey}
onAccountsChanged(ctx, v, current)
assert.Equal(t, true, v.HandleKeyReloadCalled)
@@ -294,10 +261,8 @@ func TestKeyReload_NoActiveKey(t *testing.T) {
km := &mockKeymanager{}
ctrl := gomock.NewController(t)
defer ctrl.Finish()
node := health.NewMockHealthClient(ctrl)
tracker := health.NewTracker(node)
node.EXPECT().IsHealthy(gomock.Any()).Return(true).AnyTimes()
v := &testutil.FakeValidator{Km: km, Tracker: tracker, AccountsChannel: make(chan [][fieldparams.BLSPubkeyLength]byte)}
v := &testutil.FakeValidator{Km: km, AccountsChannel: make(chan [][fieldparams.BLSPubkeyLength]byte)}
current := [][fieldparams.BLSPubkeyLength]byte{na}
onAccountsChanged(ctx, v, current)
assert.Equal(t, true, v.HandleKeyReloadCalled)
@@ -320,10 +285,8 @@ func notActive(t *testing.T) [fieldparams.BLSPubkeyLength]byte {
func TestUpdateProposerSettingsAt_EpochStart(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
node := health.NewMockHealthClient(ctrl)
tracker := health.NewTracker(node)
node.EXPECT().IsHealthy(gomock.Any()).Return(true).AnyTimes()
v := &testutil.FakeValidator{Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}, Tracker: tracker}
v := &testutil.FakeValidator{Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}}
err := v.SetProposerSettings(t.Context(), &proposer.Settings{
DefaultConfig: &proposer.Option{
FeeRecipientConfig: &proposer.FeeRecipientConfig{
@@ -343,20 +306,17 @@ func TestUpdateProposerSettingsAt_EpochStart(t *testing.T) {
cancel()
}()
run(ctx, v)
runTest(t, ctx, v)
assert.LogsContain(t, hook, "updated proposer settings")
}
func TestUpdateProposerSettingsAt_EpochEndOk(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
node := health.NewMockHealthClient(ctrl)
tracker := health.NewTracker(node)
node.EXPECT().IsHealthy(gomock.Any()).Return(true).AnyTimes()
v := &testutil.FakeValidator{
Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}},
ProposerSettingWait: time.Duration(params.BeaconConfig().SecondsPerSlot-1) * time.Second,
Tracker: tracker,
}
err := v.SetProposerSettings(t.Context(), &proposer.Settings{
DefaultConfig: &proposer.Option{
@@ -376,7 +336,7 @@ func TestUpdateProposerSettingsAt_EpochEndOk(t *testing.T) {
cancel()
}()
run(ctx, v)
runTest(t, ctx, v)
// can't test "Failed to update proposer settings" because of log.fatal
assert.LogsContain(t, hook, "Mock updated proposer settings")
}
@@ -566,11 +526,9 @@ func TestRunnerPushesProposerSettings_ValidContext(t *testing.T) {
defer assertValidContext(t, timedCtx, ctx)
delay(t)
}).AnyTimes()
hcm := health.NewMockHealthClient(ctrl)
hcm.EXPECT().IsHealthy(liveCtx).Return(true).AnyTimes().Do(func(_ any) { delay(t) })
ncm := validatormock.NewMockNodeClient(ctrl)
ncm.EXPECT().SyncStatus(liveCtx, gomock.Any()).Return(&ethpb.SyncStatus{Syncing: false}, nil)
ncm.EXPECT().HealthTracker().Return(health.NewTracker(hcm)).AnyTimes()
ccm := validatormock.NewMockChainClient(ctrl)
ccm.EXPECT().ChainHead(liveCtx, gomock.Any()).Return(&ethpb.ChainHead{}, nil).Do(func(_, _ any) { delay(t) })
@@ -606,5 +564,5 @@ func TestRunnerPushesProposerSettings_ValidContext(t *testing.T) {
submittedAggregates: make(map[submittedAttKey]*submittedAtt),
}
run(timedCtx, v)
runTest(t, timedCtx, v)
}

View File

@@ -55,12 +55,14 @@ type ValidatorService struct {
interopKeysConfig *local.InteropKeymanagerConfig
web3SignerConfig *remoteweb3signer.SetupConfig
proposerSettings *proposer.Settings
maxHealthChecks int
validatorsRegBatchSize int
enableAPI bool
emitAccountMetrics bool
logValidatorPerformance bool
distributed bool
disableDutiesPolling bool
closeClientFunc func() // validator client stop function is used here
}
// Config for the validator service.
@@ -69,6 +71,7 @@ type Config struct {
DB db.Database
Wallet *wallet.Wallet
WalletInitializedFeed *event.Feed
MaxHealthChecks int
GRPCMaxCallRecvMsgSize int
GRPCRetries uint
GRPCRetryDelay time.Duration
@@ -88,6 +91,7 @@ type Config struct {
EmitAccountMetrics bool
Distributed bool
DisableDutiesPolling bool
CloseClientFunc func()
}
// NewValidatorService creates a new validator service for the service
@@ -112,6 +116,8 @@ func NewValidatorService(ctx context.Context, cfg *Config) (*ValidatorService, e
logValidatorPerformance: cfg.LogValidatorPerformance,
distributed: cfg.Distributed,
disableDutiesPolling: cfg.DisableDutiesPolling,
closeClientFunc: cfg.CloseClientFunc,
maxHealthChecks: cfg.MaxHealthChecks,
}
dialOpts := ConstructDialOptions(
@@ -186,7 +192,7 @@ func (v *ValidatorService) Start() {
validatorClient := validatorclientfactory.NewValidatorClient(v.conn, restHandler)
valStruct := &validator{
v.validator = &validator{
slotFeed: new(event.Feed),
startBalances: make(map[[fieldparams.BLSPubkeyLength]byte]uint64),
prevEpochBalances: make(map[[fieldparams.BLSPubkeyLength]byte]uint64),
@@ -227,17 +233,45 @@ func (v *ValidatorService) Start() {
eventsChannel: make(chan *eventClient.Event, 1),
}
v.validator = valStruct
go run(v.ctx, v.validator)
hm := newHealthMonitor(v.ctx, v.cancel, v.maxHealthChecks, v.validator)
hm.Start()
defer v.closeClientFunc()
for {
select {
case <-v.ctx.Done():
log.Info("Validator service context canceled, stopping")
return
case isHealthy := <-hm.HealthyChan():
if !isHealthy {
// wait until the next health tracker update
log.Warn("Validator service health check failed, waiting for healthy beacon node...")
continue
}
log.Info("Starting validator runner")
runnerCtx, runnerCancel := context.WithCancel(v.ctx)
runner, err := newRunner(runnerCtx, v.validator, hm)
if err != nil {
log.WithError(err).Error("Could not create validator runner")
runnerCancel() // Ensure context is cancelled
return
}
go v.validator.StartEventStream(runnerCtx, eventClient.DefaultEventTopics)
runner.run(runnerCtx)
// run is finished if we get to this point
runnerCancel()
}
}
}
// Stop the validator service.
func (v *ValidatorService) Stop() error {
v.cancel()
log.Info("Stopping service")
if v.conn != nil {
return v.conn.GetGrpcClientConn().Close()
}
return nil
}

View File

@@ -11,7 +11,6 @@ go_library(
visibility = ["//validator:__subpackages__"],
deps = [
"//api/client:go_default_library",
"//api/client/beacon/health:go_default_library",
"//api/client/event:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",

View File

@@ -7,7 +7,6 @@ import (
"time"
api "github.com/OffchainLabs/prysm/v6/api/client"
"github.com/OffchainLabs/prysm/v6/api/client/beacon/health"
"github.com/OffchainLabs/prysm/v6/api/client/event"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/config/params"
@@ -24,49 +23,49 @@ var _ iface.Validator = (*FakeValidator)(nil)
// FakeValidator for mocking.
type FakeValidator struct {
IsRegularDeadline bool
CanChangeHost bool
LogValidatorGainsAndLossesCalled bool
SaveProtectionsCalled bool
DeleteProtectionCalled bool
SlotDeadlineCalled bool
HandleKeyReloadCalled bool
WaitForWalletInitializationCalled bool
SlasherReadyCalled bool
NextSlotCalled bool
AttestToBlockHeadCalled bool
SlasherReadyCalled bool
DoneCalled bool
RoleAtCalled bool
IsRegularDeadline bool
ProposeBlockCalled bool
UpdateProtectionsCalled bool
UpdateDutiesCalled bool
RoleAtCalled bool
IndexToPubkeyMap map[uint64][fieldparams.BLSPubkeyLength]byte
PubkeyToIndexMap map[[fieldparams.BLSPubkeyLength]byte]uint64
PubkeysToStatusesMap map[[fieldparams.BLSPubkeyLength]byte]ethpb.ValidatorStatus
ProposerSettingWait time.Duration
NextSlotRet <-chan primitives.Slot
UpdateDutiesArg1 uint64
RoleAtArg1 uint64
AttestToBlockHeadArg1 uint64
ProposeBlockArg1 uint64
RetryTillSuccess int
Balances map[[fieldparams.BLSPubkeyLength]byte]uint64
CanonicalHeadSlotCalled int
WaitForWalletInitializationCalled bool
NextSlotCalled bool
WaitForActivationCalled int
CanonicalHeadSlotCalled int
WaitForSyncCalled int
RetryTillSuccess int
ProposeBlockArg1 uint64
AttestToBlockHeadArg1 uint64
RoleAtArg1 uint64
UpdateDutiesArg1 uint64
NextSlotRet <-chan primitives.Slot
ProposerSettingWait time.Duration
PubkeysToStatusesMap map[[fieldparams.BLSPubkeyLength]byte]ethpb.ValidatorStatus
PubkeyToIndexMap map[[fieldparams.BLSPubkeyLength]byte]uint64
IndexToPubkeyMap map[uint64][fieldparams.BLSPubkeyLength]byte
WaitForChainStartCalled int
AttSubmitted chan interface{}
BlockProposed chan interface{}
AccountsChannel chan [][fieldparams.BLSPubkeyLength]byte
EventsChannel chan *event.Event
GenesisT uint64
ReceiveBlocksCalled int
proposerSettings *proposer.Settings
UpdateDutiesRet error
Balances map[[48]byte]uint64
EventsChannel chan *event.Event
ProposerSettingsErr error
Km keymanager.IKeymanager
graffiti string
Tracker health.Tracker
PublicKey string
UpdateDutiesRet error
RolesAtRet []iface.ValidatorRole
}
@@ -337,14 +336,10 @@ func (*FakeValidator) EventStreamIsRunning() bool {
return true
}
func (fv *FakeValidator) HealthTracker() health.Tracker {
return fv.Tracker
}
func (*FakeValidator) Host() string {
return "127.0.0.1:0"
}
func (fv *FakeValidator) ChangeHost() {
fv.Host()
func (fv *FakeValidator) FindHealthyHost(_ context.Context) bool {
return fv.CanChangeHost
}

View File

@@ -16,7 +16,6 @@ import (
"time"
"github.com/OffchainLabs/prysm/v6/api/client"
"github.com/OffchainLabs/prysm/v6/api/client/beacon/health"
eventClient "github.com/OffchainLabs/prysm/v6/api/client/event"
"github.com/OffchainLabs/prysm/v6/api/server/structs"
"github.com/OffchainLabs/prysm/v6/async/event"
@@ -1151,6 +1150,10 @@ func (v *validator) PushProposerSettings(ctx context.Context, slot primitives.Sl
}
func (v *validator) StartEventStream(ctx context.Context, topics []string) {
if v.EventStreamIsRunning() {
log.Debug("EventStream is already running")
return
}
log.WithField("topics", topics).Info("Starting event stream")
v.validatorClient.StartEventStream(ctx, topics, v.eventsChannel)
}
@@ -1242,25 +1245,41 @@ func (v *validator) EventStreamIsRunning() bool {
return v.validatorClient.EventStreamIsRunning()
}
func (v *validator) HealthTracker() health.Tracker {
return v.nodeClient.HealthTracker()
}
func (v *validator) Host() string {
return v.validatorClient.Host()
}
func (v *validator) ChangeHost() {
if len(v.beaconNodeHosts) == 1 {
log.Infof("Beacon node at %s is not responding, no backup node configured", v.Host())
return
}
func (v *validator) changeHost() {
next := (v.currentHostIndex + 1) % uint64(len(v.beaconNodeHosts))
log.Infof("Beacon node at %s is not responding, switching to %s...", v.beaconNodeHosts[v.currentHostIndex], v.beaconNodeHosts[next])
log.WithFields(logrus.Fields{
"currentHost": v.beaconNodeHosts[v.currentHostIndex],
"nextHost": v.beaconNodeHosts[next],
}).Warn("Beacon node is not responding, switching host")
v.validatorClient.SetHost(v.beaconNodeHosts[next])
v.currentHostIndex = next
}
func (v *validator) FindHealthyHost(ctx context.Context) bool {
// Tail-recursive closure keeps retry count private.
var check func(remaining int) bool
check = func(remaining int) bool {
if v.nodeClient.IsHealthy(ctx) { // healthy → done
return true
}
if len(v.beaconNodeHosts) == 1 && features.Get().EnableBeaconRESTApi {
log.WithField("host", v.Host()).Warn("Beacon node is not responding, no backup node configured")
return false
}
if remaining == 0 || !features.Get().EnableBeaconRESTApi {
return false // exhausted or REST disabled
}
v.changeHost()
return check(remaining - 1) // recurse
}
return check(len(v.beaconNodeHosts))
}
func (v *validator) filterAndCacheActiveKeys(ctx context.Context, pubkeys [][fieldparams.BLSPubkeyLength]byte, slot primitives.Slot) ([][fieldparams.BLSPubkeyLength]byte, error) {
ctx, span := trace.StartSpan(ctx, "validator.filterAndCacheActiveKeys")
defer span.End()

View File

@@ -2838,9 +2838,9 @@ func TestValidator_ChangeHost(t *testing.T) {
client.EXPECT().SetHost(v.beaconNodeHosts[1])
client.EXPECT().SetHost(v.beaconNodeHosts[0])
v.ChangeHost()
v.changeHost()
assert.Equal(t, uint64(1), v.currentHostIndex)
v.ChangeHost()
v.changeHost()
assert.Equal(t, uint64(0), v.currentHostIndex)
}

View File

@@ -58,6 +58,7 @@ type ValidatorClient struct {
wallet *wallet.Wallet
walletInitializedFeed *event.Feed
stop chan struct{} // Channel to wait for termination notifications.
once sync.Once
}
// NewValidatorClient creates a new instance of the Prysm validator client.
@@ -161,13 +162,15 @@ func (c *ValidatorClient) Start() {
// Close handles graceful shutdown of the system.
func (c *ValidatorClient) Close() {
c.lock.Lock()
defer c.lock.Unlock()
c.once.Do(func() { // runs exactly one time
c.lock.Lock()
defer c.lock.Unlock()
c.services.StopAll()
log.Info("Stopping Prysm validator")
c.cancel()
close(c.stop)
c.services.StopAll()
log.Info("Stopping Prysm validator")
c.cancel()
close(c.stop)
})
}
// checkLegacyDatabaseLocation checks is a database exists in the specified location.
@@ -441,6 +444,8 @@ func (c *ValidatorClient) registerValidatorService(cliCtx *cli.Context) error {
LogValidatorPerformance: !cliCtx.Bool(flags.DisablePenaltyRewardLogFlag.Name),
EmitAccountMetrics: !cliCtx.Bool(flags.DisableAccountMetricsFlag.Name),
Distributed: cliCtx.Bool(flags.EnableDistributed.Name),
CloseClientFunc: c.Close,
MaxHealthChecks: cliCtx.Int(flags.MaxHealthChecksFlag.Name),
})
if err != nil {
return errors.Wrap(err, "could not initialize validator service")