From f2d57f0b5fe530fe798b8661b026594410388136 Mon Sep 17 00:00:00 2001 From: james-prysm <90280386+james-prysm@users.noreply.github.com> Date: Wed, 9 Jul 2025 10:39:06 -0500 Subject: [PATCH] changes for safe validator shutdown and restarts on healthcheck (#15401) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * poc changes for safe validator shutdown * simplifying health routine and adding safe shutdown after max restarts reached * fixing health tests * fixing tests * changelog * gofmt * fixing runner * improve how runner times out * improvements to ux on logs * linting * adding in max healthcheck flag * changelog * Update james-prysm_safe-validator-shutdown.md Co-authored-by: Radosław Kapka * Update validator/client/runner.go Co-authored-by: Radosław Kapka * Update validator/client/service.go Co-authored-by: Radosław Kapka * Update validator/client/runner.go Co-authored-by: Radosław Kapka * Update validator/client/runner.go Co-authored-by: Radosław Kapka * addressing some feedback from radek * addressing some more feedback * fixing name based on feedback * fixing mistake on max health checks * conflict accidently checked in * go 1.23 no longer needs you to stop for the ticker * Update flags.go Co-authored-by: Radosław Kapka * wip no unit test for recursive healthy host find * rework healthcheck * gaz * fixing bugs and improving logs with new monitor * removing health tracker, fixing runner tests, and adding placeholder for monitor tests * fixing event stream check * linting * adding in health monitor tests * gaz * improving test * removing some log.fatals * forgot to remove comment * missed fatal removal * doppleganger should exit the node safely now * Update validator/client/health_monitor.go Co-authored-by: Radosław Kapka * radek review * Update validator/client/validator.go Co-authored-by: Radosław Kapka * Update validator/client/validator.go Co-authored-by: Radosław Kapka * Update validator/client/health_monitor.go Co-authored-by: Radosław Kapka * Update validator/client/health_monitor.go Co-authored-by: Radosław Kapka * Update validator/client/health_monitor.go Co-authored-by: Radosław Kapka * Update validator/client/validator.go Co-authored-by: Radosław Kapka * radek feedback * read up on more suggestions and making fixes to channel * suggested updates after more reading * reverting some of this because it froze the validator after healthcheck failed * fully reverting * some improvements I found during testing * Update cmd/validator/flags/flags.go Co-authored-by: Preston Van Loon * preston's feedback * clarifications on changelog * converted to using an event feed instead of my own channel publishing implementation, adding relevant logs * preston log suggestion --------- Co-authored-by: Radosław Kapka Co-authored-by: Preston Van Loon --- api/client/beacon/health/BUILD.bazel | 20 - api/client/beacon/health/health.go | 58 -- api/client/beacon/health/health_test.go | 110 ---- api/client/beacon/health/interfaces.go | 13 - api/client/beacon/health/mock.go | 58 -- .../james-prysm_safe-validator-shutdown.md | 11 + cmd/validator/flags/flags.go | 9 + cmd/validator/main.go | 1 + cmd/validator/usage.go | 1 + testing/endtoend/endtoend_test.go | 5 +- testing/validator-mock/BUILD.bazel | 4 +- testing/validator-mock/node_client_mock.go | 48 +- testing/validator-mock/validator_mock.go | 536 ++++++++++++++++++ validator/client/BUILD.bazel | 6 +- validator/client/beacon-api/BUILD.bazel | 1 - .../beacon-api/beacon_api_node_client.go | 13 +- validator/client/grpc-api/BUILD.bazel | 1 - validator/client/grpc-api/grpc_node_client.go | 11 +- validator/client/health_monitor.go | 107 ++++ validator/client/health_monitor_test.go | 255 +++++++++ validator/client/iface/BUILD.bazel | 1 - validator/client/iface/node_client.go | 3 +- validator/client/iface/validator.go | 4 +- validator/client/runner.go | 131 ++--- validator/client/runner_test.go | 128 ++--- validator/client/service.go | 46 +- validator/client/testutil/BUILD.bazel | 1 - validator/client/testutil/mock_validator.go | 49 +- validator/client/validator.go | 41 +- validator/client/validator_test.go | 4 +- validator/node/node.go | 17 +- 31 files changed, 1154 insertions(+), 539 deletions(-) delete mode 100644 api/client/beacon/health/BUILD.bazel delete mode 100644 api/client/beacon/health/health.go delete mode 100644 api/client/beacon/health/health_test.go delete mode 100644 api/client/beacon/health/interfaces.go delete mode 100644 api/client/beacon/health/mock.go create mode 100644 changelog/james-prysm_safe-validator-shutdown.md create mode 100644 testing/validator-mock/validator_mock.go create mode 100644 validator/client/health_monitor.go create mode 100644 validator/client/health_monitor_test.go diff --git a/api/client/beacon/health/BUILD.bazel b/api/client/beacon/health/BUILD.bazel deleted file mode 100644 index 77b0911647..0000000000 --- a/api/client/beacon/health/BUILD.bazel +++ /dev/null @@ -1,20 +0,0 @@ -load("@prysm//tools/go:def.bzl", "go_library", "go_test") - -go_library( - name = "go_default_library", - srcs = [ - "health.go", - "interfaces.go", - "mock.go", - ], - importpath = "github.com/OffchainLabs/prysm/v6/api/client/beacon/health", - visibility = ["//visibility:public"], - deps = ["@org_uber_go_mock//gomock:go_default_library"], -) - -go_test( - name = "go_default_test", - srcs = ["health_test.go"], - embed = [":go_default_library"], - deps = ["@org_uber_go_mock//gomock:go_default_library"], -) diff --git a/api/client/beacon/health/health.go b/api/client/beacon/health/health.go deleted file mode 100644 index 59cfc4b89f..0000000000 --- a/api/client/beacon/health/health.go +++ /dev/null @@ -1,58 +0,0 @@ -package health - -import ( - "context" - "sync" -) - -type NodeHealthTracker struct { - isHealthy *bool - healthChan chan bool - node Node - sync.RWMutex -} - -func NewTracker(node Node) Tracker { - return &NodeHealthTracker{ - node: node, - healthChan: make(chan bool, 1), - } -} - -// HealthUpdates provides a read-only channel for health updates. -func (n *NodeHealthTracker) HealthUpdates() <-chan bool { - return n.healthChan -} - -func (n *NodeHealthTracker) IsHealthy(_ context.Context) bool { - n.RLock() - defer n.RUnlock() - if n.isHealthy == nil { - return false - } - return *n.isHealthy -} - -func (n *NodeHealthTracker) CheckHealth(ctx context.Context) bool { - n.Lock() - defer n.Unlock() - - newStatus := n.node.IsHealthy(ctx) - if n.isHealthy == nil { - n.isHealthy = &newStatus - } - - isStatusChanged := newStatus != *n.isHealthy - if isStatusChanged { - // Update the health status - n.isHealthy = &newStatus - // Send the new status to the health channel, potentially overwriting the existing value - select { - case <-n.healthChan: - n.healthChan <- newStatus - default: - n.healthChan <- newStatus - } - } - return newStatus -} diff --git a/api/client/beacon/health/health_test.go b/api/client/beacon/health/health_test.go deleted file mode 100644 index 6d9e3da595..0000000000 --- a/api/client/beacon/health/health_test.go +++ /dev/null @@ -1,110 +0,0 @@ -package health - -import ( - "sync" - "testing" - - "go.uber.org/mock/gomock" -) - -func TestNodeHealth_IsHealthy(t *testing.T) { - tests := []struct { - name string - isHealthy bool - want bool - }{ - {"initially healthy", true, true}, - {"initially unhealthy", false, false}, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - n := &NodeHealthTracker{ - isHealthy: &tt.isHealthy, - healthChan: make(chan bool, 1), - } - if got := n.IsHealthy(t.Context()); got != tt.want { - t.Errorf("IsHealthy() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestNodeHealth_UpdateNodeHealth(t *testing.T) { - tests := []struct { - name string - initial bool // Initial health status - newStatus bool // Status to update to - shouldSend bool // Should a message be sent through the channel - }{ - {"healthy to unhealthy", true, false, true}, - {"unhealthy to healthy", false, true, true}, - {"remain healthy", true, true, false}, - {"remain unhealthy", false, false, false}, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - client := NewMockHealthClient(ctrl) - client.EXPECT().IsHealthy(gomock.Any()).Return(tt.newStatus) - n := &NodeHealthTracker{ - isHealthy: &tt.initial, - node: client, - healthChan: make(chan bool, 1), - } - - s := n.CheckHealth(t.Context()) - // Check if health status was updated - if s != tt.newStatus { - t.Errorf("UpdateNodeHealth() failed to update isHealthy from %v to %v", tt.initial, tt.newStatus) - } - - select { - case status := <-n.HealthUpdates(): - if !tt.shouldSend { - t.Errorf("UpdateNodeHealth() unexpectedly sent status %v to HealthCh", status) - } else if status != tt.newStatus { - t.Errorf("UpdateNodeHealth() sent wrong status %v, want %v", status, tt.newStatus) - } - default: - if tt.shouldSend { - t.Error("UpdateNodeHealth() did not send any status to HealthCh when expected") - } - } - }) - } -} - -func TestNodeHealth_Concurrency(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - client := NewMockHealthClient(ctrl) - n := NewTracker(client) - var wg sync.WaitGroup - - // Number of goroutines to spawn for both reading and writing - numGoroutines := 6 - - wg.Add(numGoroutines * 2) // for readers and writers - - // Concurrently update health status - for i := 0; i < numGoroutines; i++ { - go func() { - defer wg.Done() - client.EXPECT().IsHealthy(gomock.Any()).Return(false).Times(1) - n.CheckHealth(t.Context()) - client.EXPECT().IsHealthy(gomock.Any()).Return(true).Times(1) - n.CheckHealth(t.Context()) - }() - } - - // Concurrently read health status - for i := 0; i < numGoroutines; i++ { - go func() { - defer wg.Done() - _ = n.IsHealthy(t.Context()) // Just read the value - }() - } - - wg.Wait() // Wait for all goroutines to finish -} diff --git a/api/client/beacon/health/interfaces.go b/api/client/beacon/health/interfaces.go deleted file mode 100644 index 7a188e06a5..0000000000 --- a/api/client/beacon/health/interfaces.go +++ /dev/null @@ -1,13 +0,0 @@ -package health - -import "context" - -type Tracker interface { - HealthUpdates() <-chan bool - CheckHealth(ctx context.Context) bool - Node -} - -type Node interface { - IsHealthy(ctx context.Context) bool -} diff --git a/api/client/beacon/health/mock.go b/api/client/beacon/health/mock.go deleted file mode 100644 index 8d6f591281..0000000000 --- a/api/client/beacon/health/mock.go +++ /dev/null @@ -1,58 +0,0 @@ -package health - -import ( - "context" - "reflect" - "sync" - - "go.uber.org/mock/gomock" -) - -var ( - _ = Node(&MockHealthClient{}) -) - -// MockHealthClient is a mock of HealthClient interface. -type MockHealthClient struct { - ctrl *gomock.Controller - recorder *MockHealthClientMockRecorder - sync.Mutex -} - -// MockHealthClientMockRecorder is the mock recorder for MockHealthClient. -type MockHealthClientMockRecorder struct { - mock *MockHealthClient -} - -// IsHealthy mocks base method. -func (m *MockHealthClient) IsHealthy(arg0 context.Context) bool { - m.Lock() - defer m.Unlock() - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "IsHealthy", arg0) - ret0, ok := ret[0].(bool) - if !ok { - return false - } - return ret0 -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockHealthClient) EXPECT() *MockHealthClientMockRecorder { - return m.recorder -} - -// IsHealthy indicates an expected call of IsHealthy. -func (mr *MockHealthClientMockRecorder) IsHealthy(arg0 any) *gomock.Call { - mr.mock.Lock() - defer mr.mock.Unlock() - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsHealthy", reflect.TypeOf((*MockHealthClient)(nil).IsHealthy), arg0) -} - -// NewMockHealthClient creates a new mock instance. -func NewMockHealthClient(ctrl *gomock.Controller) *MockHealthClient { - mock := &MockHealthClient{ctrl: ctrl} - mock.recorder = &MockHealthClientMockRecorder{mock} - return mock -} diff --git a/changelog/james-prysm_safe-validator-shutdown.md b/changelog/james-prysm_safe-validator-shutdown.md new file mode 100644 index 0000000000..74d9a4eb63 --- /dev/null +++ b/changelog/james-prysm_safe-validator-shutdown.md @@ -0,0 +1,11 @@ +## Added + +- Added `max-health-checks` flag that sets the maximum times the validator tries to check the health of the beacon node before timing out. 0 or a negative number is indefinite. (the default is 0) + +## Fixed + +- Validator client shuts down cleanly on error instead of fatal error. + +## Changed + +- Previously, we optimistically believed the beacon node was healthy and tried to get chain start, but now we do a health check at the start. \ No newline at end of file diff --git a/cmd/validator/flags/flags.go b/cmd/validator/flags/flags.go index 9eb417e9f4..51414da2c0 100644 --- a/cmd/validator/flags/flags.go +++ b/cmd/validator/flags/flags.go @@ -19,6 +19,8 @@ const ( WalletDefaultDirName = "prysm-wallet-v2" // DefaultHTTPServerHost for the validator client. DefaultHTTPServerHost = "127.0.0.1" + + DefaultMaxHealthChecks = 0 ) var ( @@ -394,6 +396,13 @@ var ( Usage: "Disables polling of duties on dependent root changes.", Value: false, } + + // MaxHealthChecksFlag sets a maximum amount of times to check for beacon node health before validator client times out and shuts down + MaxHealthChecksFlag = &cli.IntFlag{ + Name: "max-health-checks", + Usage: "Maximum number of health checks to perform before exiting if not healthy. Set to 0 or a negative number for indefinite checks.", + Value: DefaultMaxHealthChecks, + } ) // DefaultValidatorDir returns OS-specific default validator directory. diff --git a/cmd/validator/main.go b/cmd/validator/main.go index 04a138fe89..f109de0dee 100644 --- a/cmd/validator/main.go +++ b/cmd/validator/main.go @@ -76,6 +76,7 @@ var appFlags = []cli.Flag{ flags.EnableDistributed, flags.AuthTokenPathFlag, flags.DisableDutiesPolling, + flags.MaxHealthChecksFlag, // Consensys' Web3Signer flags flags.Web3SignerURLFlag, flags.Web3SignerPublicValidatorKeysFlag, diff --git a/cmd/validator/usage.go b/cmd/validator/usage.go index 68255848f7..00c6c13a68 100644 --- a/cmd/validator/usage.go +++ b/cmd/validator/usage.go @@ -141,6 +141,7 @@ var appHelpFlagGroups = []flagGroup{ flags.EnableDistributed, flags.AuthTokenPathFlag, flags.DisableDutiesPolling, + flags.MaxHealthChecksFlag, }, }, { diff --git a/testing/endtoend/endtoend_test.go b/testing/endtoend/endtoend_test.go index cc7e0f1589..9c7738e375 100644 --- a/testing/endtoend/endtoend_test.go +++ b/testing/endtoend/endtoend_test.go @@ -425,10 +425,7 @@ func (r *testRunner) testDoppelGangerProtection(ctx context.Context) error { if r.t.Failed() { return errors.New("doppelganger was unable to be found") } - // Expect an abrupt exit for the validator client. - if err := g.Wait(); err == nil || !strings.Contains(err.Error(), errGeneralCode) { - return fmt.Errorf("wanted an error of %s but received %v", errGeneralCode, err) - } + require.NoError(r.t, g.Wait()) return nil } diff --git a/testing/validator-mock/BUILD.bazel b/testing/validator-mock/BUILD.bazel index c5bfed3f24..b9c56d2452 100644 --- a/testing/validator-mock/BUILD.bazel +++ b/testing/validator-mock/BUILD.bazel @@ -9,16 +9,18 @@ go_library( "node_client_mock.go", "prysm_chain_client_mock.go", "validator_client_mock.go", + "validator_mock.go", ], importpath = "github.com/OffchainLabs/prysm/v6/testing/validator-mock", visibility = ["//visibility:public"], deps = [ - "//api/client/beacon/health:go_default_library", "//api/client/event:go_default_library", + "//config/proposer:go_default_library", "//consensus-types/primitives:go_default_library", "//consensus-types/validator:go_default_library", "//proto/prysm/v1alpha1:go_default_library", "//validator/client/iface:go_default_library", + "//validator/keymanager:go_default_library", "@org_golang_google_protobuf//types/known/emptypb:go_default_library", "@org_uber_go_mock//gomock:go_default_library", ], diff --git a/testing/validator-mock/node_client_mock.go b/testing/validator-mock/node_client_mock.go index 7168fd7913..d26032d379 100644 --- a/testing/validator-mock/node_client_mock.go +++ b/testing/validator-mock/node_client_mock.go @@ -13,7 +13,6 @@ import ( context "context" reflect "reflect" - health "github.com/OffchainLabs/prysm/v6/api/client/beacon/health" eth "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1" gomock "go.uber.org/mock/gomock" emptypb "google.golang.org/protobuf/types/known/emptypb" @@ -23,7 +22,6 @@ import ( type MockNodeClient struct { ctrl *gomock.Controller recorder *MockNodeClientMockRecorder - isgomock struct{} } // MockNodeClientMockRecorder is the mock recorder for MockNodeClient. @@ -44,75 +42,75 @@ func (m *MockNodeClient) EXPECT() *MockNodeClientMockRecorder { } // Genesis mocks base method. -func (m *MockNodeClient) Genesis(ctx context.Context, in *emptypb.Empty) (*eth.Genesis, error) { +func (m *MockNodeClient) Genesis(arg0 context.Context, arg1 *emptypb.Empty) (*eth.Genesis, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Genesis", ctx, in) + ret := m.ctrl.Call(m, "Genesis", arg0, arg1) ret0, _ := ret[0].(*eth.Genesis) ret1, _ := ret[1].(error) return ret0, ret1 } // Genesis indicates an expected call of Genesis. -func (mr *MockNodeClientMockRecorder) Genesis(ctx, in any) *gomock.Call { +func (mr *MockNodeClientMockRecorder) Genesis(arg0, arg1 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Genesis", reflect.TypeOf((*MockNodeClient)(nil).Genesis), ctx, in) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Genesis", reflect.TypeOf((*MockNodeClient)(nil).Genesis), arg0, arg1) } -// HealthTracker mocks base method. -func (m *MockNodeClient) HealthTracker() health.Tracker { +// IsHealthy mocks base method. +func (m *MockNodeClient) IsHealthy(arg0 context.Context) bool { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "HealthTracker") - ret0, _ := ret[0].(health.Tracker) + ret := m.ctrl.Call(m, "IsHealthy", arg0) + ret0, _ := ret[0].(bool) return ret0 } -// HealthTracker indicates an expected call of HealthTracker. -func (mr *MockNodeClientMockRecorder) HealthTracker() *gomock.Call { +// IsHealthy indicates an expected call of IsHealthy. +func (mr *MockNodeClientMockRecorder) IsHealthy(arg0 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HealthTracker", reflect.TypeOf((*MockNodeClient)(nil).HealthTracker)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsHealthy", reflect.TypeOf((*MockNodeClient)(nil).IsHealthy), arg0) } // Peers mocks base method. -func (m *MockNodeClient) Peers(ctx context.Context, in *emptypb.Empty) (*eth.Peers, error) { +func (m *MockNodeClient) Peers(arg0 context.Context, arg1 *emptypb.Empty) (*eth.Peers, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Peers", ctx, in) + ret := m.ctrl.Call(m, "Peers", arg0, arg1) ret0, _ := ret[0].(*eth.Peers) ret1, _ := ret[1].(error) return ret0, ret1 } // Peers indicates an expected call of Peers. -func (mr *MockNodeClientMockRecorder) Peers(ctx, in any) *gomock.Call { +func (mr *MockNodeClientMockRecorder) Peers(arg0, arg1 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Peers", reflect.TypeOf((*MockNodeClient)(nil).Peers), ctx, in) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Peers", reflect.TypeOf((*MockNodeClient)(nil).Peers), arg0, arg1) } // SyncStatus mocks base method. -func (m *MockNodeClient) SyncStatus(ctx context.Context, in *emptypb.Empty) (*eth.SyncStatus, error) { +func (m *MockNodeClient) SyncStatus(arg0 context.Context, arg1 *emptypb.Empty) (*eth.SyncStatus, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "SyncStatus", ctx, in) + ret := m.ctrl.Call(m, "SyncStatus", arg0, arg1) ret0, _ := ret[0].(*eth.SyncStatus) ret1, _ := ret[1].(error) return ret0, ret1 } // SyncStatus indicates an expected call of SyncStatus. -func (mr *MockNodeClientMockRecorder) SyncStatus(ctx, in any) *gomock.Call { +func (mr *MockNodeClientMockRecorder) SyncStatus(arg0, arg1 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SyncStatus", reflect.TypeOf((*MockNodeClient)(nil).SyncStatus), ctx, in) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SyncStatus", reflect.TypeOf((*MockNodeClient)(nil).SyncStatus), arg0, arg1) } // Version mocks base method. -func (m *MockNodeClient) Version(ctx context.Context, in *emptypb.Empty) (*eth.Version, error) { +func (m *MockNodeClient) Version(arg0 context.Context, arg1 *emptypb.Empty) (*eth.Version, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Version", ctx, in) + ret := m.ctrl.Call(m, "Version", arg0, arg1) ret0, _ := ret[0].(*eth.Version) ret1, _ := ret[1].(error) return ret0, ret1 } // Version indicates an expected call of Version. -func (mr *MockNodeClientMockRecorder) Version(ctx, in any) *gomock.Call { +func (mr *MockNodeClientMockRecorder) Version(arg0, arg1 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Version", reflect.TypeOf((*MockNodeClient)(nil).Version), ctx, in) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Version", reflect.TypeOf((*MockNodeClient)(nil).Version), arg0, arg1) } diff --git a/testing/validator-mock/validator_mock.go b/testing/validator-mock/validator_mock.go new file mode 100644 index 0000000000..5984e1b493 --- /dev/null +++ b/testing/validator-mock/validator_mock.go @@ -0,0 +1,536 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/OffchainLabs/prysm/v6/validator/client/iface (interfaces: Validator) +// +// Generated by this command: +// +// mockgen -package=validator_mock -destination=testing/validator-mock/validator_mock.go github.com/OffchainLabs/prysm/v6/validator/client/iface Validator +// + +// Package validator_mock is a generated GoMock package. +package validator_mock + +import ( + context "context" + reflect "reflect" + time "time" + + event "github.com/OffchainLabs/prysm/v6/api/client/event" + proposer "github.com/OffchainLabs/prysm/v6/config/proposer" + primitives "github.com/OffchainLabs/prysm/v6/consensus-types/primitives" + eth "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1" + iface "github.com/OffchainLabs/prysm/v6/validator/client/iface" + keymanager "github.com/OffchainLabs/prysm/v6/validator/keymanager" + gomock "go.uber.org/mock/gomock" +) + +// MockValidator is a mock of Validator interface. +type MockValidator struct { + ctrl *gomock.Controller + recorder *MockValidatorMockRecorder +} + +// MockValidatorMockRecorder is the mock recorder for MockValidator. +type MockValidatorMockRecorder struct { + mock *MockValidator +} + +// NewMockValidator creates a new mock instance. +func NewMockValidator(ctrl *gomock.Controller) *MockValidator { + mock := &MockValidator{ctrl: ctrl} + mock.recorder = &MockValidatorMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockValidator) EXPECT() *MockValidatorMockRecorder { + return m.recorder +} + +// AccountsChangedChan mocks base method. +func (m *MockValidator) AccountsChangedChan() <-chan [][48]byte { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AccountsChangedChan") + ret0, _ := ret[0].(<-chan [][48]byte) + return ret0 +} + +// AccountsChangedChan indicates an expected call of AccountsChangedChan. +func (mr *MockValidatorMockRecorder) AccountsChangedChan() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AccountsChangedChan", reflect.TypeOf((*MockValidator)(nil).AccountsChangedChan)) +} + +// CanonicalHeadSlot mocks base method. +func (m *MockValidator) CanonicalHeadSlot(arg0 context.Context) (primitives.Slot, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CanonicalHeadSlot", arg0) + ret0, _ := ret[0].(primitives.Slot) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CanonicalHeadSlot indicates an expected call of CanonicalHeadSlot. +func (mr *MockValidatorMockRecorder) CanonicalHeadSlot(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CanonicalHeadSlot", reflect.TypeOf((*MockValidator)(nil).CanonicalHeadSlot), arg0) +} + +// CheckDoppelGanger mocks base method. +func (m *MockValidator) CheckDoppelGanger(arg0 context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CheckDoppelGanger", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// CheckDoppelGanger indicates an expected call of CheckDoppelGanger. +func (mr *MockValidatorMockRecorder) CheckDoppelGanger(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CheckDoppelGanger", reflect.TypeOf((*MockValidator)(nil).CheckDoppelGanger), arg0) +} + +// DeleteGraffiti mocks base method. +func (m *MockValidator) DeleteGraffiti(arg0 context.Context, arg1 [48]byte) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteGraffiti", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteGraffiti indicates an expected call of DeleteGraffiti. +func (mr *MockValidatorMockRecorder) DeleteGraffiti(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteGraffiti", reflect.TypeOf((*MockValidator)(nil).DeleteGraffiti), arg0, arg1) +} + +// Done mocks base method. +func (m *MockValidator) Done() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Done") +} + +// Done indicates an expected call of Done. +func (mr *MockValidatorMockRecorder) Done() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Done", reflect.TypeOf((*MockValidator)(nil).Done)) +} + +// EventStreamIsRunning mocks base method. +func (m *MockValidator) EventStreamIsRunning() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "EventStreamIsRunning") + ret0, _ := ret[0].(bool) + return ret0 +} + +// EventStreamIsRunning indicates an expected call of EventStreamIsRunning. +func (mr *MockValidatorMockRecorder) EventStreamIsRunning() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EventStreamIsRunning", reflect.TypeOf((*MockValidator)(nil).EventStreamIsRunning)) +} + +// EventsChan mocks base method. +func (m *MockValidator) EventsChan() <-chan *event.Event { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "EventsChan") + ret0, _ := ret[0].(<-chan *event.Event) + return ret0 +} + +// EventsChan indicates an expected call of EventsChan. +func (mr *MockValidatorMockRecorder) EventsChan() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EventsChan", reflect.TypeOf((*MockValidator)(nil).EventsChan)) +} + +// FindHealthyHost mocks base method. +func (m *MockValidator) FindHealthyHost(arg0 context.Context) bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FindHealthyHost", arg0) + ret0, _ := ret[0].(bool) + return ret0 +} + +// FindHealthyHost indicates an expected call of FindHealthyHost. +func (mr *MockValidatorMockRecorder) FindHealthyHost(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindHealthyHost", reflect.TypeOf((*MockValidator)(nil).FindHealthyHost), arg0) +} + +// Graffiti mocks base method. +func (m *MockValidator) Graffiti(arg0 context.Context, arg1 [48]byte) ([]byte, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Graffiti", arg0, arg1) + ret0, _ := ret[0].([]byte) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Graffiti indicates an expected call of Graffiti. +func (mr *MockValidatorMockRecorder) Graffiti(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Graffiti", reflect.TypeOf((*MockValidator)(nil).Graffiti), arg0, arg1) +} + +// HandleKeyReload mocks base method. +func (m *MockValidator) HandleKeyReload(arg0 context.Context, arg1 [][48]byte) (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "HandleKeyReload", arg0, arg1) + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// HandleKeyReload indicates an expected call of HandleKeyReload. +func (mr *MockValidatorMockRecorder) HandleKeyReload(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HandleKeyReload", reflect.TypeOf((*MockValidator)(nil).HandleKeyReload), arg0, arg1) +} + +// Host mocks base method. +func (m *MockValidator) Host() string { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Host") + ret0, _ := ret[0].(string) + return ret0 +} + +// Host indicates an expected call of Host. +func (mr *MockValidatorMockRecorder) Host() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Host", reflect.TypeOf((*MockValidator)(nil).Host)) +} + +// Keymanager mocks base method. +func (m *MockValidator) Keymanager() (keymanager.IKeymanager, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Keymanager") + ret0, _ := ret[0].(keymanager.IKeymanager) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Keymanager indicates an expected call of Keymanager. +func (mr *MockValidatorMockRecorder) Keymanager() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Keymanager", reflect.TypeOf((*MockValidator)(nil).Keymanager)) +} + +// LogSubmittedAtts mocks base method. +func (m *MockValidator) LogSubmittedAtts(arg0 primitives.Slot) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "LogSubmittedAtts", arg0) +} + +// LogSubmittedAtts indicates an expected call of LogSubmittedAtts. +func (mr *MockValidatorMockRecorder) LogSubmittedAtts(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LogSubmittedAtts", reflect.TypeOf((*MockValidator)(nil).LogSubmittedAtts), arg0) +} + +// LogSubmittedSyncCommitteeMessages mocks base method. +func (m *MockValidator) LogSubmittedSyncCommitteeMessages() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "LogSubmittedSyncCommitteeMessages") +} + +// LogSubmittedSyncCommitteeMessages indicates an expected call of LogSubmittedSyncCommitteeMessages. +func (mr *MockValidatorMockRecorder) LogSubmittedSyncCommitteeMessages() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LogSubmittedSyncCommitteeMessages", reflect.TypeOf((*MockValidator)(nil).LogSubmittedSyncCommitteeMessages)) +} + +// LogValidatorGainsAndLosses mocks base method. +func (m *MockValidator) LogValidatorGainsAndLosses(arg0 context.Context, arg1 primitives.Slot) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "LogValidatorGainsAndLosses", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// LogValidatorGainsAndLosses indicates an expected call of LogValidatorGainsAndLosses. +func (mr *MockValidatorMockRecorder) LogValidatorGainsAndLosses(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LogValidatorGainsAndLosses", reflect.TypeOf((*MockValidator)(nil).LogValidatorGainsAndLosses), arg0, arg1) +} + +// NextSlot mocks base method. +func (m *MockValidator) NextSlot() <-chan primitives.Slot { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NextSlot") + ret0, _ := ret[0].(<-chan primitives.Slot) + return ret0 +} + +// NextSlot indicates an expected call of NextSlot. +func (mr *MockValidatorMockRecorder) NextSlot() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NextSlot", reflect.TypeOf((*MockValidator)(nil).NextSlot)) +} + +// ProcessEvent mocks base method. +func (m *MockValidator) ProcessEvent(arg0 context.Context, arg1 *event.Event) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "ProcessEvent", arg0, arg1) +} + +// ProcessEvent indicates an expected call of ProcessEvent. +func (mr *MockValidatorMockRecorder) ProcessEvent(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProcessEvent", reflect.TypeOf((*MockValidator)(nil).ProcessEvent), arg0, arg1) +} + +// ProposeBlock mocks base method. +func (m *MockValidator) ProposeBlock(arg0 context.Context, arg1 primitives.Slot, arg2 [48]byte) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "ProposeBlock", arg0, arg1, arg2) +} + +// ProposeBlock indicates an expected call of ProposeBlock. +func (mr *MockValidatorMockRecorder) ProposeBlock(arg0, arg1, arg2 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProposeBlock", reflect.TypeOf((*MockValidator)(nil).ProposeBlock), arg0, arg1, arg2) +} + +// ProposerSettings mocks base method. +func (m *MockValidator) ProposerSettings() *proposer.Settings { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ProposerSettings") + ret0, _ := ret[0].(*proposer.Settings) + return ret0 +} + +// ProposerSettings indicates an expected call of ProposerSettings. +func (mr *MockValidatorMockRecorder) ProposerSettings() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProposerSettings", reflect.TypeOf((*MockValidator)(nil).ProposerSettings)) +} + +// PushProposerSettings mocks base method. +func (m *MockValidator) PushProposerSettings(arg0 context.Context, arg1 primitives.Slot, arg2 bool) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PushProposerSettings", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// PushProposerSettings indicates an expected call of PushProposerSettings. +func (mr *MockValidatorMockRecorder) PushProposerSettings(arg0, arg1, arg2 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PushProposerSettings", reflect.TypeOf((*MockValidator)(nil).PushProposerSettings), arg0, arg1, arg2) +} + +// RolesAt mocks base method. +func (m *MockValidator) RolesAt(arg0 context.Context, arg1 primitives.Slot) (map[[48]byte][]iface.ValidatorRole, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RolesAt", arg0, arg1) + ret0, _ := ret[0].(map[[48]byte][]iface.ValidatorRole) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// RolesAt indicates an expected call of RolesAt. +func (mr *MockValidatorMockRecorder) RolesAt(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RolesAt", reflect.TypeOf((*MockValidator)(nil).RolesAt), arg0, arg1) +} + +// SetGraffiti mocks base method. +func (m *MockValidator) SetGraffiti(arg0 context.Context, arg1 [48]byte, arg2 []byte) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetGraffiti", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// SetGraffiti indicates an expected call of SetGraffiti. +func (mr *MockValidatorMockRecorder) SetGraffiti(arg0, arg1, arg2 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetGraffiti", reflect.TypeOf((*MockValidator)(nil).SetGraffiti), arg0, arg1, arg2) +} + +// SetProposerSettings mocks base method. +func (m *MockValidator) SetProposerSettings(arg0 context.Context, arg1 *proposer.Settings) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetProposerSettings", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// SetProposerSettings indicates an expected call of SetProposerSettings. +func (mr *MockValidatorMockRecorder) SetProposerSettings(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetProposerSettings", reflect.TypeOf((*MockValidator)(nil).SetProposerSettings), arg0, arg1) +} + +// SignValidatorRegistrationRequest mocks base method. +func (m *MockValidator) SignValidatorRegistrationRequest(arg0 context.Context, arg1 iface.SigningFunc, arg2 *eth.ValidatorRegistrationV1) (*eth.SignedValidatorRegistrationV1, bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SignValidatorRegistrationRequest", arg0, arg1, arg2) + ret0, _ := ret[0].(*eth.SignedValidatorRegistrationV1) + ret1, _ := ret[1].(bool) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// SignValidatorRegistrationRequest indicates an expected call of SignValidatorRegistrationRequest. +func (mr *MockValidatorMockRecorder) SignValidatorRegistrationRequest(arg0, arg1, arg2 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SignValidatorRegistrationRequest", reflect.TypeOf((*MockValidator)(nil).SignValidatorRegistrationRequest), arg0, arg1, arg2) +} + +// SlotDeadline mocks base method. +func (m *MockValidator) SlotDeadline(arg0 primitives.Slot) time.Time { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SlotDeadline", arg0) + ret0, _ := ret[0].(time.Time) + return ret0 +} + +// SlotDeadline indicates an expected call of SlotDeadline. +func (mr *MockValidatorMockRecorder) SlotDeadline(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SlotDeadline", reflect.TypeOf((*MockValidator)(nil).SlotDeadline), arg0) +} + +// StartEventStream mocks base method. +func (m *MockValidator) StartEventStream(arg0 context.Context, arg1 []string) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "StartEventStream", arg0, arg1) +} + +// StartEventStream indicates an expected call of StartEventStream. +func (mr *MockValidatorMockRecorder) StartEventStream(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StartEventStream", reflect.TypeOf((*MockValidator)(nil).StartEventStream), arg0, arg1) +} + +// SubmitAggregateAndProof mocks base method. +func (m *MockValidator) SubmitAggregateAndProof(arg0 context.Context, arg1 primitives.Slot, arg2 [48]byte) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SubmitAggregateAndProof", arg0, arg1, arg2) +} + +// SubmitAggregateAndProof indicates an expected call of SubmitAggregateAndProof. +func (mr *MockValidatorMockRecorder) SubmitAggregateAndProof(arg0, arg1, arg2 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubmitAggregateAndProof", reflect.TypeOf((*MockValidator)(nil).SubmitAggregateAndProof), arg0, arg1, arg2) +} + +// SubmitAttestation mocks base method. +func (m *MockValidator) SubmitAttestation(arg0 context.Context, arg1 primitives.Slot, arg2 [48]byte) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SubmitAttestation", arg0, arg1, arg2) +} + +// SubmitAttestation indicates an expected call of SubmitAttestation. +func (mr *MockValidatorMockRecorder) SubmitAttestation(arg0, arg1, arg2 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubmitAttestation", reflect.TypeOf((*MockValidator)(nil).SubmitAttestation), arg0, arg1, arg2) +} + +// SubmitSignedContributionAndProof mocks base method. +func (m *MockValidator) SubmitSignedContributionAndProof(arg0 context.Context, arg1 primitives.Slot, arg2 [48]byte) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SubmitSignedContributionAndProof", arg0, arg1, arg2) +} + +// SubmitSignedContributionAndProof indicates an expected call of SubmitSignedContributionAndProof. +func (mr *MockValidatorMockRecorder) SubmitSignedContributionAndProof(arg0, arg1, arg2 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubmitSignedContributionAndProof", reflect.TypeOf((*MockValidator)(nil).SubmitSignedContributionAndProof), arg0, arg1, arg2) +} + +// SubmitSyncCommitteeMessage mocks base method. +func (m *MockValidator) SubmitSyncCommitteeMessage(arg0 context.Context, arg1 primitives.Slot, arg2 [48]byte) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SubmitSyncCommitteeMessage", arg0, arg1, arg2) +} + +// SubmitSyncCommitteeMessage indicates an expected call of SubmitSyncCommitteeMessage. +func (mr *MockValidatorMockRecorder) SubmitSyncCommitteeMessage(arg0, arg1, arg2 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubmitSyncCommitteeMessage", reflect.TypeOf((*MockValidator)(nil).SubmitSyncCommitteeMessage), arg0, arg1, arg2) +} + +// UpdateDomainDataCaches mocks base method. +func (m *MockValidator) UpdateDomainDataCaches(arg0 context.Context, arg1 primitives.Slot) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "UpdateDomainDataCaches", arg0, arg1) +} + +// UpdateDomainDataCaches indicates an expected call of UpdateDomainDataCaches. +func (mr *MockValidatorMockRecorder) UpdateDomainDataCaches(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateDomainDataCaches", reflect.TypeOf((*MockValidator)(nil).UpdateDomainDataCaches), arg0, arg1) +} + +// UpdateDuties mocks base method. +func (m *MockValidator) UpdateDuties(arg0 context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UpdateDuties", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// UpdateDuties indicates an expected call of UpdateDuties. +func (mr *MockValidatorMockRecorder) UpdateDuties(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateDuties", reflect.TypeOf((*MockValidator)(nil).UpdateDuties), arg0) +} + +// WaitForActivation mocks base method. +func (m *MockValidator) WaitForActivation(arg0 context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WaitForActivation", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// WaitForActivation indicates an expected call of WaitForActivation. +func (mr *MockValidatorMockRecorder) WaitForActivation(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WaitForActivation", reflect.TypeOf((*MockValidator)(nil).WaitForActivation), arg0) +} + +// WaitForChainStart mocks base method. +func (m *MockValidator) WaitForChainStart(arg0 context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WaitForChainStart", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// WaitForChainStart indicates an expected call of WaitForChainStart. +func (mr *MockValidatorMockRecorder) WaitForChainStart(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WaitForChainStart", reflect.TypeOf((*MockValidator)(nil).WaitForChainStart), arg0) +} + +// WaitForKeymanagerInitialization mocks base method. +func (m *MockValidator) WaitForKeymanagerInitialization(arg0 context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WaitForKeymanagerInitialization", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// WaitForKeymanagerInitialization indicates an expected call of WaitForKeymanagerInitialization. +func (mr *MockValidatorMockRecorder) WaitForKeymanagerInitialization(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WaitForKeymanagerInitialization", reflect.TypeOf((*MockValidator)(nil).WaitForKeymanagerInitialization), arg0) +} + +// WaitForSync mocks base method. +func (m *MockValidator) WaitForSync(arg0 context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WaitForSync", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// WaitForSync indicates an expected call of WaitForSync. +func (mr *MockValidatorMockRecorder) WaitForSync(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WaitForSync", reflect.TypeOf((*MockValidator)(nil).WaitForSync), arg0) +} diff --git a/validator/client/BUILD.bazel b/validator/client/BUILD.bazel index 7556e972a8..21d5fc8f55 100644 --- a/validator/client/BUILD.bazel +++ b/validator/client/BUILD.bazel @@ -5,6 +5,7 @@ go_library( srcs = [ "aggregate.go", "attest.go", + "health_monitor.go", "key_reload.go", "log.go", "metrics.go", @@ -24,7 +25,6 @@ go_library( ], deps = [ "//api/client:go_default_library", - "//api/client/beacon/health:go_default_library", "//api/client/event:go_default_library", "//api/grpc:go_default_library", "//api/server/structs:go_default_library", @@ -104,6 +104,7 @@ go_test( srcs = [ "aggregate_test.go", "attest_test.go", + "health_monitor_test.go", "key_reload_test.go", "log_test.go", "metrics_test.go", @@ -121,7 +122,6 @@ go_test( ], embed = [":go_default_library"], deps = [ - "//api/client/beacon/health:go_default_library", "//api/server/structs:go_default_library", "//async/event:go_default_library", "//beacon-chain/core/signing:go_default_library", @@ -169,6 +169,8 @@ go_test( "@com_github_prysmaticlabs_go_bitfield//:go_default_library", "@com_github_sirupsen_logrus//:go_default_library", "@com_github_sirupsen_logrus//hooks/test:go_default_library", + "@com_github_stretchr_testify//assert:go_default_library", + "@com_github_stretchr_testify//require:go_default_library", "@com_github_tyler_smith_go_bip39//:go_default_library", "@com_github_urfave_cli_v2//:go_default_library", "@com_github_wealdtech_go_eth2_util//:go_default_library", diff --git a/validator/client/beacon-api/BUILD.bazel b/validator/client/beacon-api/BUILD.bazel index b91480254e..23d675cc88 100644 --- a/validator/client/beacon-api/BUILD.bazel +++ b/validator/client/beacon-api/BUILD.bazel @@ -42,7 +42,6 @@ go_library( deps = [ "//api:go_default_library", "//api/apiutil:go_default_library", - "//api/client/beacon/health:go_default_library", "//api/client/event:go_default_library", "//api/server/structs:go_default_library", "//beacon-chain/core/helpers:go_default_library", diff --git a/validator/client/beacon-api/beacon_api_node_client.go b/validator/client/beacon-api/beacon_api_node_client.go index 086d757415..5eeb92ce03 100644 --- a/validator/client/beacon-api/beacon_api_node_client.go +++ b/validator/client/beacon-api/beacon_api_node_client.go @@ -4,7 +4,6 @@ import ( "context" "strconv" - "github.com/OffchainLabs/prysm/v6/api/client/beacon/health" "github.com/OffchainLabs/prysm/v6/api/server/structs" ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1" "github.com/OffchainLabs/prysm/v6/validator/client/iface" @@ -22,7 +21,6 @@ type beaconApiNodeClient struct { fallbackClient iface.NodeClient jsonRestHandler RestHandler genesisProvider GenesisProvider - healthTracker health.Tracker } func (c *beaconApiNodeClient) SyncStatus(ctx context.Context, _ *empty.Empty) (*ethpb.SyncStatus, error) { @@ -104,11 +102,11 @@ func (c *beaconApiNodeClient) Peers(ctx context.Context, in *empty.Empty) (*ethp } func (c *beaconApiNodeClient) IsHealthy(ctx context.Context) bool { - return c.jsonRestHandler.Get(ctx, "/eth/v1/node/health", nil) == nil -} - -func (c *beaconApiNodeClient) HealthTracker() health.Tracker { - return c.healthTracker + if err := c.jsonRestHandler.Get(ctx, "/eth/v1/node/health", nil); err != nil { + log.WithError(err).Error("failed to get health of node") + return false + } + return true } func NewNodeClientWithFallback(jsonRestHandler RestHandler, fallbackClient iface.NodeClient) iface.NodeClient { @@ -117,6 +115,5 @@ func NewNodeClientWithFallback(jsonRestHandler RestHandler, fallbackClient iface fallbackClient: fallbackClient, genesisProvider: &beaconApiGenesisProvider{jsonRestHandler: jsonRestHandler}, } - b.healthTracker = health.NewTracker(b) return b } diff --git a/validator/client/grpc-api/BUILD.bazel b/validator/client/grpc-api/BUILD.bazel index b0d0f2bbce..f8eda10448 100644 --- a/validator/client/grpc-api/BUILD.bazel +++ b/validator/client/grpc-api/BUILD.bazel @@ -12,7 +12,6 @@ go_library( visibility = ["//validator:__subpackages__"], deps = [ "//api/client:go_default_library", - "//api/client/beacon/health:go_default_library", "//api/client/event:go_default_library", "//api/server/structs:go_default_library", "//beacon-chain/rpc/eth/helpers:go_default_library", diff --git a/validator/client/grpc-api/grpc_node_client.go b/validator/client/grpc-api/grpc_node_client.go index 94f5f2e48c..9ec755f2be 100644 --- a/validator/client/grpc-api/grpc_node_client.go +++ b/validator/client/grpc-api/grpc_node_client.go @@ -3,7 +3,6 @@ package grpc_api import ( "context" - "github.com/OffchainLabs/prysm/v6/api/client/beacon/health" ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1" "github.com/OffchainLabs/prysm/v6/validator/client/iface" "github.com/golang/protobuf/ptypes/empty" @@ -16,8 +15,7 @@ var ( ) type grpcNodeClient struct { - nodeClient ethpb.NodeClient - healthTracker health.Tracker + nodeClient ethpb.NodeClient } func (c *grpcNodeClient) SyncStatus(ctx context.Context, in *empty.Empty) (*ethpb.SyncStatus, error) { @@ -39,18 +37,13 @@ func (c *grpcNodeClient) Peers(ctx context.Context, in *empty.Empty) (*ethpb.Pee func (c *grpcNodeClient) IsHealthy(ctx context.Context) bool { _, err := c.nodeClient.GetHealth(ctx, ðpb.HealthRequest{}) if err != nil { - log.WithError(err).Debug("failed to get health of node") + log.WithError(err).Error("failed to get health of node") return false } return true } -func (c *grpcNodeClient) HealthTracker() health.Tracker { - return c.healthTracker -} - func NewNodeClient(cc grpc.ClientConnInterface) iface.NodeClient { g := &grpcNodeClient{nodeClient: ethpb.NewNodeClient(cc)} - g.healthTracker = health.NewTracker(g) return g } diff --git a/validator/client/health_monitor.go b/validator/client/health_monitor.go new file mode 100644 index 0000000000..110a63e363 --- /dev/null +++ b/validator/client/health_monitor.go @@ -0,0 +1,107 @@ +package client + +import ( + "context" + "sync" + "time" + + "github.com/OffchainLabs/prysm/v6/async/event" + "github.com/OffchainLabs/prysm/v6/config/params" + "github.com/OffchainLabs/prysm/v6/validator/client/iface" + "github.com/sirupsen/logrus" +) + +type healthMonitor struct { + ctx context.Context + cancel context.CancelFunc + v iface.Validator + maxFails int + healthyCh chan bool // emits true → healthy, false → unhealthy + healthEventFeed *event.Feed + fails int + isHealthy bool + sync.RWMutex +} + +// newHealthMonitor +func newHealthMonitor( + parentCtx context.Context, + parentCancel context.CancelFunc, + maxFails int, + v iface.Validator, +) *healthMonitor { + m := &healthMonitor{ + ctx: parentCtx, + cancel: parentCancel, + maxFails: maxFails, + v: v, + healthyCh: make(chan bool), + healthEventFeed: new(event.Feed), + } + m.healthEventFeed.Subscribe(m.healthyCh) + return m +} + +func (m *healthMonitor) IsHealthy() bool { + m.RLock() + defer m.RUnlock() + return m.isHealthy +} + +func (m *healthMonitor) performHealthCheck() { + ishealthy := m.v.FindHealthyHost(m.ctx) + m.Lock() + defer m.Unlock() + if ishealthy { + m.fails = 0 + } else if m.maxFails > 0 && m.fails < m.maxFails { + log.WithFields(logrus.Fields{ + "fails": m.fails, + "maxFails": m.maxFails, + }).Warn("Failed health check, beacon node is unresponsive") + m.fails++ + } else if m.maxFails > 0 && m.fails >= m.maxFails { + log.WithField("maxFails", m.maxFails).Warn("Maximum health checks reached. Stopping health check routine") + m.isHealthy = ishealthy + m.cancel() + return + } + if ishealthy == m.isHealthy { + // is not a new status so skip update + log.WithField("isHealthy", m.isHealthy).Debug("Health status did not change") + return + } + log.WithFields(logrus.Fields{ + "healthy": ishealthy, + "previously": m.isHealthy, + }).Info("Health status changed") + m.isHealthy = ishealthy + go m.healthEventFeed.Send(ishealthy) // non blocking send +} + +func (m *healthMonitor) loop() { + log.Debug("Starting health check routine for beacon node apis") + interval := time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second + ticker := time.NewTicker(interval) + + for ; true; <-ticker.C { // check immediately + if m.ctx.Err() != nil { + log.Debug("Context canceled, stopping health checking") + return + } + m.performHealthCheck() + } +} + +// Start launches the monitor loop (non-blocking). +func (m *healthMonitor) Start() { + go m.loop() +} + +// Stop terminates the monitor and closes its channel. +func (m *healthMonitor) Stop() { + m.cancel() +} + +// HealthyChan exposes liveness updates; the channel closes when Stop() is called. +func (m *healthMonitor) HealthyChan() <-chan bool { return m.healthyCh } diff --git a/validator/client/health_monitor_test.go b/validator/client/health_monitor_test.go new file mode 100644 index 0000000000..51bb4fe85f --- /dev/null +++ b/validator/client/health_monitor_test.go @@ -0,0 +1,255 @@ +package client + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/OffchainLabs/prysm/v6/async/event" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" + + "github.com/OffchainLabs/prysm/v6/config/params" + validatormock "github.com/OffchainLabs/prysm/v6/testing/validator-mock" +) + +// TestHealthMonitor_IsHealthy_Concurrency tests thread-safety of IsHealthy. +func TestHealthMonitor_IsHealthy_Concurrency(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockValidator := validatormock.NewMockValidator(ctrl) + // inside the test + parentCtx, parentCancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + t.Cleanup(parentCancel) + + // Expectation for newHealthMonitor's FindHealthyHost call + mockValidator.EXPECT().FindHealthyHost(gomock.Any()).Return(true).Times(1) + + monitor := newHealthMonitor(parentCtx, parentCancel, 3, mockValidator) + require.NotNil(t, monitor) + monitor.Start() + time.Sleep(100 * time.Millisecond) + + var wg sync.WaitGroup + numGoroutines := 10 + + for i := 0; i < numGoroutines; i++ { + wg.Add(1) + go func() { + defer wg.Done() + assert.True(t, monitor.IsHealthy()) + }() + } + wg.Wait() + + // Test when isHealthy is false + monitor.Lock() + monitor.isHealthy = false + monitor.Unlock() + + for i := 0; i < numGoroutines; i++ { + wg.Add(1) + go func() { + defer wg.Done() + assert.False(t, monitor.IsHealthy()) + }() + } + wg.Wait() +} + +// TestHealthMonitor_PerformHealthCheck tests the core logic of a single health check. +func TestHealthMonitor_PerformHealthCheck(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockValidator := validatormock.NewMockValidator(ctrl) + + tests := []struct { + expectStatusUpdate bool // true if healthyCh should receive a new, different status + expectCancelCalled bool + expectedIsHealthy bool + findHealthyHostReturns bool + initialIsHealthy bool + expectedFails int + maxFails int + initialFails int + name string + }{ + { + name: "Becomes Unhealthy", + initialIsHealthy: true, + initialFails: 0, + maxFails: 3, + findHealthyHostReturns: false, + expectedIsHealthy: false, + expectedFails: 1, + expectCancelCalled: false, + expectStatusUpdate: true, + }, + { + name: "Becomes Healthy", + initialIsHealthy: false, + initialFails: 1, + maxFails: 3, + findHealthyHostReturns: true, + expectedIsHealthy: true, + expectedFails: 0, + expectCancelCalled: false, + expectStatusUpdate: true, + }, + { + name: "Remains Healthy", + initialIsHealthy: true, + initialFails: 0, + maxFails: 3, + findHealthyHostReturns: true, + expectedIsHealthy: true, + expectedFails: 0, + expectCancelCalled: false, + expectStatusUpdate: false, // Status did not change + }, + { + name: "Remains Unhealthy", + initialIsHealthy: false, + initialFails: 1, + maxFails: 3, + findHealthyHostReturns: false, + expectedIsHealthy: false, + expectedFails: 2, + expectCancelCalled: false, + expectStatusUpdate: false, // Status did not change + }, + { + name: "Max Fails Reached - Stays Unhealthy and Cancels", + initialIsHealthy: false, + initialFails: 2, // One fail away from maxFails + maxFails: 2, + findHealthyHostReturns: false, + expectedIsHealthy: false, + expectedFails: 2, + expectCancelCalled: true, + expectStatusUpdate: false, // Status was already false, no new update sent before cancel + }, + { + name: "MaxFails is 0 - Remains Unhealthy, No Cancel", + initialIsHealthy: false, + initialFails: 100, // Arbitrarily high + maxFails: 0, // Infinite + findHealthyHostReturns: false, + expectedIsHealthy: false, + expectedFails: 100, + expectCancelCalled: false, + expectStatusUpdate: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + monitorCtx, monitorCancelFunc := context.WithCancel(context.Background()) + var actualCancelFuncCalled bool + testCancelCallback := func() { + actualCancelFuncCalled = true + monitorCancelFunc() // Propagate to monitorCtx if needed for other parts + } + + monitor := &healthMonitor{ + ctx: monitorCtx, // Context for the monitor's operations + cancel: testCancelCallback, // This is m.cancel() + v: mockValidator, + maxFails: tt.maxFails, + healthyCh: make(chan bool, 1), + fails: tt.initialFails, + isHealthy: tt.initialIsHealthy, + healthEventFeed: new(event.Feed), + } + monitor.healthEventFeed.Subscribe(monitor.healthyCh) + + mockValidator.EXPECT().FindHealthyHost(gomock.Any()).Return(tt.findHealthyHostReturns) + + monitor.performHealthCheck() + + assert.Equal(t, tt.expectedIsHealthy, monitor.IsHealthy(), "isHealthy mismatch") + assert.Equal(t, tt.expectedFails, monitor.fails, "fails count mismatch") + assert.Equal(t, tt.expectCancelCalled, actualCancelFuncCalled, "cancelCalled mismatch") + + if tt.expectStatusUpdate { + assert.Eventually(t, func() bool { + select { + case s := <-monitor.HealthyChan(): + return s == tt.expectedIsHealthy + default: + return false + } + }, 100*time.Millisecond, 10*time.Millisecond) // wait, poll + } else { + assert.Never(t, func() bool { + select { + case <-monitor.HealthyChan(): + return true // received something: fail + default: + return false + } + }, 100*time.Millisecond, 10*time.Millisecond) + } + if !actualCancelFuncCalled { + monitorCancelFunc() // Clean up context if not cancelled by test logic + } + }) + } +} + +// TestHealthMonitor_HealthyChan_ReceivesUpdates tests channel behavior. +func TestHealthMonitor_HealthyChan_ReceivesUpdates(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockValidator := validatormock.NewMockValidator(ctrl) + monitorCtx, monitorCancelFunc := context.WithCancel(context.Background()) + + originalSecPerSlot := params.BeaconConfig().SecondsPerSlot + params.BeaconConfig().SecondsPerSlot = 1 // 1 sec interval for test + defer func() { + params.BeaconConfig().SecondsPerSlot = originalSecPerSlot + monitorCancelFunc() // Ensure monitor context is cleaned up + }() + + monitor := newHealthMonitor(monitorCtx, monitorCancelFunc, 3, mockValidator) + require.NotNil(t, monitor) + + ch := monitor.HealthyChan() + require.NotNil(t, ch) + + first := mockValidator.EXPECT(). + FindHealthyHost(gomock.Any()). + Return(true).Times(1) + + mockValidator.EXPECT(). + FindHealthyHost(gomock.Any()). + Return(false). + AnyTimes(). + After(first) + + monitor.Start() + + // Consume initial prime value (true) + select { + case status := <-ch: + assert.True(t, status, "Expected initial status to be true") + case <-time.After(100 * time.Millisecond): + t.Fatal("Timeout waiting for initial status") + } + + // Expect 'false' from the first check in Start's loop + select { + case status := <-ch: + assert.False(t, status, "Expected status to change to false") + case <-time.After(2 * time.Second): // Timeout for tick + processing + t.Fatal("Timeout waiting for status change to false") + } + + // 4. Stop the monitor + monitor.Stop() // This calls monitorCancelFunc +} diff --git a/validator/client/iface/BUILD.bazel b/validator/client/iface/BUILD.bazel index e354d496c1..c2e38d2b41 100644 --- a/validator/client/iface/BUILD.bazel +++ b/validator/client/iface/BUILD.bazel @@ -12,7 +12,6 @@ go_library( importpath = "github.com/OffchainLabs/prysm/v6/validator/client/iface", visibility = ["//visibility:public"], deps = [ - "//api/client/beacon/health:go_default_library", "//api/client/event:go_default_library", "//config/fieldparams:go_default_library", "//config/proposer:go_default_library", diff --git a/validator/client/iface/node_client.go b/validator/client/iface/node_client.go index 7b293d78ae..2062703460 100644 --- a/validator/client/iface/node_client.go +++ b/validator/client/iface/node_client.go @@ -3,7 +3,6 @@ package iface import ( "context" - "github.com/OffchainLabs/prysm/v6/api/client/beacon/health" ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1" "github.com/golang/protobuf/ptypes/empty" ) @@ -13,5 +12,5 @@ type NodeClient interface { Genesis(ctx context.Context, in *empty.Empty) (*ethpb.Genesis, error) Version(ctx context.Context, in *empty.Empty) (*ethpb.Version, error) Peers(ctx context.Context, in *empty.Empty) (*ethpb.Peers, error) - HealthTracker() health.Tracker + IsHealthy(ctx context.Context) bool } diff --git a/validator/client/iface/validator.go b/validator/client/iface/validator.go index 4e091fb6d3..f66b2a2e4b 100644 --- a/validator/client/iface/validator.go +++ b/validator/client/iface/validator.go @@ -4,7 +4,6 @@ import ( "context" "time" - "github.com/OffchainLabs/prysm/v6/api/client/beacon/health" "github.com/OffchainLabs/prysm/v6/api/client/event" fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams" "github.com/OffchainLabs/prysm/v6/config/proposer" @@ -69,9 +68,8 @@ type Validator interface { Graffiti(ctx context.Context, pubKey [fieldparams.BLSPubkeyLength]byte) ([]byte, error) SetGraffiti(ctx context.Context, pubKey [fieldparams.BLSPubkeyLength]byte, graffiti []byte) error DeleteGraffiti(ctx context.Context, pubKey [fieldparams.BLSPubkeyLength]byte) error - HealthTracker() health.Tracker Host() string - ChangeHost() + FindHealthyHost(ctx context.Context) bool } // SigningFunc interface defines a type for the function that signs a message diff --git a/validator/client/runner.go b/validator/client/runner.go index 60bcd03229..342c07b246 100644 --- a/validator/client/runner.go +++ b/validator/client/runner.go @@ -7,8 +7,6 @@ import ( "time" "github.com/OffchainLabs/prysm/v6/api/client" - "github.com/OffchainLabs/prysm/v6/api/client/event" - "github.com/OffchainLabs/prysm/v6/config/features" fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams" "github.com/OffchainLabs/prysm/v6/config/params" "github.com/OffchainLabs/prysm/v6/consensus-types/primitives" @@ -25,24 +23,26 @@ import ( // Time to wait before trying to reconnect with beacon node. var backOffPeriod = 10 * time.Second -// Run the main validator routine. This routine exits if the context is -// canceled. +// runner encapsulates the main validator routine. +type runner struct { + validator iface.Validator + healthMonitor *healthMonitor +} + +// newRunner creates a new runner instance and performs all necessary initialization. +// This function can return an error if initialization fails. // // Order of operations: // 1 - Initialize validator data // 2 - Wait for validator activation -// 3 - Wait for the next slot start -// 4 - Update assignments -// 5 - Determine role at current slot -// 6 - Perform assigned role, if any -func run(ctx context.Context, v iface.Validator) { - cleanup := v.Done - defer cleanup() - +func newRunner(ctx context.Context, v iface.Validator, monitor *healthMonitor) (*runner, error) { + // Initialize validator and get head slot headSlot, err := initializeValidatorAndGetHeadSlot(ctx, v) if err != nil { - return // Exit if context is canceled. + v.Done() + return nil, err } + // Prepare initial duties update ss, err := slots.EpochStart(slots.ToEpoch(headSlot + 1)) if err != nil { log.WithError(err).Error("Failed to get epoch start") @@ -51,11 +51,10 @@ func run(ctx context.Context, v iface.Validator) { startDeadline := v.SlotDeadline(ss + params.BeaconConfig().SlotsPerEpoch - 1) startCtx, startCancel := context.WithDeadline(ctx, startDeadline) if err := v.UpdateDuties(startCtx); err != nil { + // Don't return error here, just log it handleAssignmentError(err, headSlot) } startCancel() - healthTracker := v.HealthTracker() - runHealthCheckRoutine(ctx, v) // check if proposer settings is still nil // Set properties on the beacon node like the fee recipient for validators that are being used & active. @@ -64,16 +63,38 @@ func run(ctx context.Context, v iface.Validator) { " and will continue to use settings provided in the beacon node.") } if err := v.PushProposerSettings(ctx, headSlot, true); err != nil { - log.WithError(err).Fatal("Failed to update proposer settings") + v.Done() + return nil, errors.Wrap(err, "failed to update proposer settings") } + + return &runner{ + validator: v, + healthMonitor: monitor, + }, nil +} + +// run executes the main validator routine. This routine exits if the context is +// canceled. It returns a channel that will be closed when the routine exits. +// +// Order of operations: +// 1 - Wait for the next slot start +// 2 - Update assignments if needed +// 3 - Determine role at current slot +// 4 - Perform assigned role, if any +func (r *runner) run(ctx context.Context) { + v := r.validator + cleanup := v.Done + defer cleanup() + for { select { case <-ctx.Done(): log.Info("Context canceled, stopping validator") return // Exit if context is canceled. case slot := <-v.NextSlot(): - if !healthTracker.IsHealthy(ctx) { - continue + if !r.healthMonitor.IsHealthy() { + log.Warn("Beacon node unhealthy, stopping runner") + return } deadline := v.SlotDeadline(slot) @@ -126,27 +147,6 @@ func run(ctx context.Context, v iface.Validator) { // performRoles calls span.End() rolesCtx, _ := context.WithDeadline(ctx, deadline) performRoles(rolesCtx, allRoles, v, slot, &wg, span) - case isHealthyAgain := <-healthTracker.HealthUpdates(): - if isHealthyAgain { - headSlot, err = initializeValidatorAndGetHeadSlot(ctx, v) - if err != nil { - log.WithError(err).Error("Failed to re initialize validator and get head slot") - continue - } - ss, err := slots.EpochStart(slots.ToEpoch(headSlot + 1)) - if err != nil { - log.WithError(err).Error("Failed to get epoch start") - continue - } - deadline := v.SlotDeadline(ss + params.BeaconConfig().SlotsPerEpoch - 1) - dutiesCtx, dutiesCancel := context.WithDeadline(ctx, deadline) - if err := v.UpdateDuties(dutiesCtx); err != nil { - handleAssignmentError(err, headSlot) - dutiesCancel() - continue - } - dutiesCancel() - } case e := <-v.EventsChan(): v.ProcessEvent(ctx, e) case currentKeys := <-v.AccountsChangedChan(): // should be less of a priority than next slot @@ -203,13 +203,11 @@ func initializeValidatorAndGetHeadSlot(ctx context.Context, v iface.Validator) ( continue } - log.WithError(err).Fatal("Could not determine if beacon chain started") + return 0, errors.Wrap(err, "could not determine if beacon chain started") } if err := v.WaitForKeymanagerInitialization(ctx); err != nil { - // log.Fatal will prevent defer from being called - v.Done() - log.WithError(err).Fatal("Wallet is not ready") + return 0, errors.Wrap(err, "Wallet is not ready") } if err := v.WaitForSync(ctx); err != nil { @@ -218,21 +216,21 @@ func initializeValidatorAndGetHeadSlot(ctx context.Context, v iface.Validator) ( continue } - log.WithError(err).Fatal("Could not determine if beacon node synced") + return 0, errors.Wrap(err, "could not determine if beacon node synced") } if err := v.WaitForActivation(ctx); err != nil { - log.WithError(err).Fatal("Could not wait for validator activation") + return 0, errors.Wrap(err, "could not wait for validator activation") } headSlot, err = v.CanonicalHeadSlot(ctx) if isConnectionError(err) { - log.WithError(err).Warn("Could not get current canonical head slot") + log.WithError(err).Warn("could not get current canonical head slot") continue } if err != nil { - log.WithError(err).Fatal("Could not get current canonical head slot") + return 0, errors.Wrap(err, "could not get current canonical head slot") } if err := v.CheckDoppelGanger(ctx); err != nil { @@ -241,7 +239,7 @@ func initializeValidatorAndGetHeadSlot(ctx context.Context, v iface.Validator) ( continue } - log.WithError(err).Fatal("Could not succeed with doppelganger check") + return 0, errors.Wrap(err, "could not succeed with doppelganger check") } break } @@ -309,40 +307,3 @@ func handleAssignmentError(err error, slot primitives.Slot) { log.WithError(err).Error("Failed to update assignments") } } - -func runHealthCheckRoutine(ctx context.Context, v iface.Validator) { - log.Info("Starting health check routine for beacon node apis") - healthCheckTicker := time.NewTicker(time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second) - tracker := v.HealthTracker() - go func() { - // trigger the healthcheck immediately the first time - for ; true; <-healthCheckTicker.C { - if ctx.Err() != nil { - log.WithError(ctx.Err()).Error("Context cancelled") - return - } - isHealthy := tracker.CheckHealth(ctx) - if !isHealthy && features.Get().EnableBeaconRESTApi { - v.ChangeHost() - if !tracker.CheckHealth(ctx) { - continue // Skip to the next ticker - } - - slot, err := v.CanonicalHeadSlot(ctx) - if err != nil { - log.WithError(err).Error("Could not get canonical head slot") - return - } - if err := v.PushProposerSettings(ctx, slot, true); err != nil { - log.WithError(err).Warn("Failed to update proposer settings") - } - } - - // in case of node returning healthy but event stream died - if isHealthy && !v.EventStreamIsRunning() { - log.Info("Event stream reconnecting...") - go v.StartEventStream(ctx, event.DefaultEventTopics) - } - } - }() -} diff --git a/validator/client/runner_test.go b/validator/client/runner_test.go index dd3b408aa5..675a37cfe8 100644 --- a/validator/client/runner_test.go +++ b/validator/client/runner_test.go @@ -10,7 +10,6 @@ import ( "testing" "time" - "github.com/OffchainLabs/prysm/v6/api/client/beacon/health" "github.com/OffchainLabs/prysm/v6/async/event" "github.com/OffchainLabs/prysm/v6/cache/lru" fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams" @@ -42,47 +41,46 @@ func cancelledContext() context.Context { return ctx } +// Helper function to run the validator runner for tests +func runTest(t *testing.T, ctx context.Context, v iface.Validator) { + r, err := newRunner(ctx, v, &healthMonitor{isHealthy: true}) + require.NoError(t, err) + r.run(ctx) +} + func TestCancelledContext_CleansUpValidator(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - node := health.NewMockHealthClient(ctrl) - tracker := health.NewTracker(node) v := &testutil.FakeValidator{ - Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}, - Tracker: tracker, + Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}, } - run(cancelledContext(), v) + runTest(t, cancelledContext(), v) assert.Equal(t, true, v.DoneCalled, "Expected Done() to be called") } func TestCancelledContext_WaitsForChainStart(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - node := health.NewMockHealthClient(ctrl) - tracker := health.NewTracker(node) v := &testutil.FakeValidator{ - Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}, - Tracker: tracker, + Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}, } - run(cancelledContext(), v) + runTest(t, cancelledContext(), v) assert.Equal(t, 1, v.WaitForChainStartCalled, "Expected WaitForChainStart() to be called") } func TestRetry_On_ConnectionError(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - node := health.NewMockHealthClient(ctrl) - tracker := health.NewTracker(node) retry := 10 - node.EXPECT().IsHealthy(gomock.Any()).Return(true) v := &testutil.FakeValidator{ Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}, - Tracker: tracker, RetryTillSuccess: retry, } backOffPeriod = 10 * time.Millisecond + ctx, cancel := context.WithCancel(t.Context()) - go run(ctx, v) + go runTest(t, ctx, v) + // each step will fail (retry times)=10 this sleep times will wait more then // the time it takes for all steps to succeed before main loop. time.Sleep(time.Duration(retry*6) * backOffPeriod) @@ -97,25 +95,17 @@ func TestRetry_On_ConnectionError(t *testing.T) { func TestCancelledContext_WaitsForActivation(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - node := health.NewMockHealthClient(ctrl) - tracker := health.NewTracker(node) v := &testutil.FakeValidator{ - Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}, - Tracker: tracker, + Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}, } - run(cancelledContext(), v) + runTest(t, cancelledContext(), v) assert.Equal(t, 1, v.WaitForActivationCalled, "Expected WaitForActivation() to be called") } func TestUpdateDuties_NextSlot(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - node := health.NewMockHealthClient(ctrl) - tracker := health.NewTracker(node) - node.EXPECT().IsHealthy(gomock.Any()).Return(true).AnyTimes() - // avoid race condition between the cancellation of the context in the go stream from slot and the setting of IsHealthy - _ = tracker.CheckHealth(t.Context()) - v := &testutil.FakeValidator{Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}, Tracker: tracker} + v := &testutil.FakeValidator{Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}} ctx, cancel := context.WithCancel(t.Context()) slot := primitives.Slot(55) @@ -127,7 +117,7 @@ func TestUpdateDuties_NextSlot(t *testing.T) { cancel() }() - run(ctx, v) + runTest(t, ctx, v) require.Equal(t, true, v.UpdateDutiesCalled, "Expected UpdateAssignments(%d) to be called", slot) } @@ -136,12 +126,8 @@ func TestUpdateDuties_HandlesError(t *testing.T) { hook := logTest.NewGlobal() ctrl := gomock.NewController(t) defer ctrl.Finish() - node := health.NewMockHealthClient(ctrl) - tracker := health.NewTracker(node) - node.EXPECT().IsHealthy(gomock.Any()).Return(true).AnyTimes() - // avoid race condition between the cancellation of the context in the go stream from slot and the setting of IsHealthy - _ = tracker.CheckHealth(t.Context()) - v := &testutil.FakeValidator{Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}, Tracker: tracker} + + v := &testutil.FakeValidator{Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}} ctx, cancel := context.WithCancel(t.Context()) slot := primitives.Slot(55) @@ -154,7 +140,7 @@ func TestUpdateDuties_HandlesError(t *testing.T) { }() v.UpdateDutiesRet = errors.New("bad") - run(ctx, v) + runTest(t, ctx, v) require.LogsContain(t, hook, "Failed to update assignments") } @@ -162,12 +148,8 @@ func TestUpdateDuties_HandlesError(t *testing.T) { func TestRoleAt_NextSlot(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - node := health.NewMockHealthClient(ctrl) - tracker := health.NewTracker(node) - node.EXPECT().IsHealthy(gomock.Any()).Return(true).AnyTimes() - // avoid race condition between the cancellation of the context in the go stream from slot and the setting of IsHealthy - _ = tracker.CheckHealth(t.Context()) - v := &testutil.FakeValidator{Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}, Tracker: tracker} + + v := &testutil.FakeValidator{Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}} ctx, cancel := context.WithCancel(t.Context()) slot := primitives.Slot(55) @@ -179,7 +161,7 @@ func TestRoleAt_NextSlot(t *testing.T) { cancel() }() - run(ctx, v) + runTest(t, ctx, v) require.Equal(t, true, v.RoleAtCalled, "Expected RoleAt(%d) to be called", slot) assert.Equal(t, uint64(slot), v.RoleAtArg1, "RoleAt called with the wrong arg") @@ -188,13 +170,9 @@ func TestRoleAt_NextSlot(t *testing.T) { func TestAttests_NextSlot(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - node := health.NewMockHealthClient(ctrl) - tracker := health.NewTracker(node) - node.EXPECT().IsHealthy(gomock.Any()).Return(true).AnyTimes() - // avoid race condition between the cancellation of the context in the go stream from slot and the setting of IsHealthy - _ = tracker.CheckHealth(t.Context()) + attSubmitted := make(chan interface{}) - v := &testutil.FakeValidator{Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}, Tracker: tracker, AttSubmitted: attSubmitted} + v := &testutil.FakeValidator{Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}, AttSubmitted: attSubmitted} ctx, cancel := context.WithCancel(t.Context()) slot := primitives.Slot(55) @@ -206,7 +184,7 @@ func TestAttests_NextSlot(t *testing.T) { cancel() }() - run(ctx, v) + runTest(t, ctx, v) <-attSubmitted require.Equal(t, true, v.AttestToBlockHeadCalled, "SubmitAttestation(%d) was not called", slot) assert.Equal(t, uint64(slot), v.AttestToBlockHeadArg1, "SubmitAttestation was called with wrong arg") @@ -215,13 +193,8 @@ func TestAttests_NextSlot(t *testing.T) { func TestProposes_NextSlot(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - node := health.NewMockHealthClient(ctrl) - tracker := health.NewTracker(node) - node.EXPECT().IsHealthy(gomock.Any()).Return(true).AnyTimes() - // avoid race condition between the cancellation of the context in the go stream from slot and the setting of IsHealthy - _ = tracker.CheckHealth(t.Context()) blockProposed := make(chan interface{}) - v := &testutil.FakeValidator{Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}, Tracker: tracker, BlockProposed: blockProposed} + v := &testutil.FakeValidator{Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}, BlockProposed: blockProposed} ctx, cancel := context.WithCancel(t.Context()) slot := primitives.Slot(55) @@ -233,7 +206,7 @@ func TestProposes_NextSlot(t *testing.T) { cancel() }() - run(ctx, v) + runTest(t, ctx, v) <-blockProposed require.Equal(t, true, v.ProposeBlockCalled, "ProposeBlock(%d) was not called", slot) @@ -243,14 +216,10 @@ func TestProposes_NextSlot(t *testing.T) { func TestBothProposesAndAttests_NextSlot(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - node := health.NewMockHealthClient(ctrl) - tracker := health.NewTracker(node) - node.EXPECT().IsHealthy(gomock.Any()).Return(true).AnyTimes() - // avoid race condition between the cancellation of the context in the go stream from slot and the setting of IsHealthy - _ = tracker.CheckHealth(t.Context()) + blockProposed := make(chan interface{}) attSubmitted := make(chan interface{}) - v := &testutil.FakeValidator{Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}, Tracker: tracker, BlockProposed: blockProposed, AttSubmitted: attSubmitted} + v := &testutil.FakeValidator{Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}, BlockProposed: blockProposed, AttSubmitted: attSubmitted} ctx, cancel := context.WithCancel(t.Context()) slot := primitives.Slot(55) @@ -262,7 +231,7 @@ func TestBothProposesAndAttests_NextSlot(t *testing.T) { cancel() }() - run(ctx, v) + runTest(t, ctx, v) <-blockProposed <-attSubmitted require.Equal(t, true, v.AttestToBlockHeadCalled, "SubmitAttestation(%d) was not called", slot) @@ -276,10 +245,8 @@ func TestKeyReload_ActiveKey(t *testing.T) { km := &mockKeymanager{} ctrl := gomock.NewController(t) defer ctrl.Finish() - node := health.NewMockHealthClient(ctrl) - tracker := health.NewTracker(node) - node.EXPECT().IsHealthy(gomock.Any()).Return(true).AnyTimes() - v := &testutil.FakeValidator{Km: km, Tracker: tracker, AccountsChannel: make(chan [][fieldparams.BLSPubkeyLength]byte)} + + v := &testutil.FakeValidator{Km: km, AccountsChannel: make(chan [][fieldparams.BLSPubkeyLength]byte)} current := [][fieldparams.BLSPubkeyLength]byte{testutil.ActiveKey} onAccountsChanged(ctx, v, current) assert.Equal(t, true, v.HandleKeyReloadCalled) @@ -294,10 +261,8 @@ func TestKeyReload_NoActiveKey(t *testing.T) { km := &mockKeymanager{} ctrl := gomock.NewController(t) defer ctrl.Finish() - node := health.NewMockHealthClient(ctrl) - tracker := health.NewTracker(node) - node.EXPECT().IsHealthy(gomock.Any()).Return(true).AnyTimes() - v := &testutil.FakeValidator{Km: km, Tracker: tracker, AccountsChannel: make(chan [][fieldparams.BLSPubkeyLength]byte)} + + v := &testutil.FakeValidator{Km: km, AccountsChannel: make(chan [][fieldparams.BLSPubkeyLength]byte)} current := [][fieldparams.BLSPubkeyLength]byte{na} onAccountsChanged(ctx, v, current) assert.Equal(t, true, v.HandleKeyReloadCalled) @@ -320,10 +285,8 @@ func notActive(t *testing.T) [fieldparams.BLSPubkeyLength]byte { func TestUpdateProposerSettingsAt_EpochStart(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - node := health.NewMockHealthClient(ctrl) - tracker := health.NewTracker(node) - node.EXPECT().IsHealthy(gomock.Any()).Return(true).AnyTimes() - v := &testutil.FakeValidator{Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}, Tracker: tracker} + + v := &testutil.FakeValidator{Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}} err := v.SetProposerSettings(t.Context(), &proposer.Settings{ DefaultConfig: &proposer.Option{ FeeRecipientConfig: &proposer.FeeRecipientConfig{ @@ -343,20 +306,17 @@ func TestUpdateProposerSettingsAt_EpochStart(t *testing.T) { cancel() }() - run(ctx, v) + runTest(t, ctx, v) assert.LogsContain(t, hook, "updated proposer settings") } func TestUpdateProposerSettingsAt_EpochEndOk(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - node := health.NewMockHealthClient(ctrl) - tracker := health.NewTracker(node) - node.EXPECT().IsHealthy(gomock.Any()).Return(true).AnyTimes() + v := &testutil.FakeValidator{ Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}, ProposerSettingWait: time.Duration(params.BeaconConfig().SecondsPerSlot-1) * time.Second, - Tracker: tracker, } err := v.SetProposerSettings(t.Context(), &proposer.Settings{ DefaultConfig: &proposer.Option{ @@ -376,7 +336,7 @@ func TestUpdateProposerSettingsAt_EpochEndOk(t *testing.T) { cancel() }() - run(ctx, v) + runTest(t, ctx, v) // can't test "Failed to update proposer settings" because of log.fatal assert.LogsContain(t, hook, "Mock updated proposer settings") } @@ -566,11 +526,9 @@ func TestRunnerPushesProposerSettings_ValidContext(t *testing.T) { defer assertValidContext(t, timedCtx, ctx) delay(t) }).AnyTimes() - hcm := health.NewMockHealthClient(ctrl) - hcm.EXPECT().IsHealthy(liveCtx).Return(true).AnyTimes().Do(func(_ any) { delay(t) }) ncm := validatormock.NewMockNodeClient(ctrl) ncm.EXPECT().SyncStatus(liveCtx, gomock.Any()).Return(ðpb.SyncStatus{Syncing: false}, nil) - ncm.EXPECT().HealthTracker().Return(health.NewTracker(hcm)).AnyTimes() + ccm := validatormock.NewMockChainClient(ctrl) ccm.EXPECT().ChainHead(liveCtx, gomock.Any()).Return(ðpb.ChainHead{}, nil).Do(func(_, _ any) { delay(t) }) @@ -606,5 +564,5 @@ func TestRunnerPushesProposerSettings_ValidContext(t *testing.T) { submittedAggregates: make(map[submittedAttKey]*submittedAtt), } - run(timedCtx, v) + runTest(t, timedCtx, v) } diff --git a/validator/client/service.go b/validator/client/service.go index ef2ad1acf3..22487b386b 100644 --- a/validator/client/service.go +++ b/validator/client/service.go @@ -55,12 +55,14 @@ type ValidatorService struct { interopKeysConfig *local.InteropKeymanagerConfig web3SignerConfig *remoteweb3signer.SetupConfig proposerSettings *proposer.Settings + maxHealthChecks int validatorsRegBatchSize int enableAPI bool emitAccountMetrics bool logValidatorPerformance bool distributed bool disableDutiesPolling bool + closeClientFunc func() // validator client stop function is used here } // Config for the validator service. @@ -69,6 +71,7 @@ type Config struct { DB db.Database Wallet *wallet.Wallet WalletInitializedFeed *event.Feed + MaxHealthChecks int GRPCMaxCallRecvMsgSize int GRPCRetries uint GRPCRetryDelay time.Duration @@ -88,6 +91,7 @@ type Config struct { EmitAccountMetrics bool Distributed bool DisableDutiesPolling bool + CloseClientFunc func() } // NewValidatorService creates a new validator service for the service @@ -112,6 +116,8 @@ func NewValidatorService(ctx context.Context, cfg *Config) (*ValidatorService, e logValidatorPerformance: cfg.LogValidatorPerformance, distributed: cfg.Distributed, disableDutiesPolling: cfg.DisableDutiesPolling, + closeClientFunc: cfg.CloseClientFunc, + maxHealthChecks: cfg.MaxHealthChecks, } dialOpts := ConstructDialOptions( @@ -186,7 +192,7 @@ func (v *ValidatorService) Start() { validatorClient := validatorclientfactory.NewValidatorClient(v.conn, restHandler) - valStruct := &validator{ + v.validator = &validator{ slotFeed: new(event.Feed), startBalances: make(map[[fieldparams.BLSPubkeyLength]byte]uint64), prevEpochBalances: make(map[[fieldparams.BLSPubkeyLength]byte]uint64), @@ -227,17 +233,45 @@ func (v *ValidatorService) Start() { eventsChannel: make(chan *eventClient.Event, 1), } - v.validator = valStruct - go run(v.ctx, v.validator) + hm := newHealthMonitor(v.ctx, v.cancel, v.maxHealthChecks, v.validator) + hm.Start() + defer v.closeClientFunc() + + for { + select { + case <-v.ctx.Done(): + log.Info("Validator service context canceled, stopping") + return + case isHealthy := <-hm.HealthyChan(): + if !isHealthy { + // wait until the next health tracker update + log.Warn("Validator service health check failed, waiting for healthy beacon node...") + continue + } + + log.Info("Starting validator runner") + runnerCtx, runnerCancel := context.WithCancel(v.ctx) + + runner, err := newRunner(runnerCtx, v.validator, hm) + if err != nil { + log.WithError(err).Error("Could not create validator runner") + runnerCancel() // Ensure context is cancelled + return + } + + go v.validator.StartEventStream(runnerCtx, eventClient.DefaultEventTopics) + + runner.run(runnerCtx) + // run is finished if we get to this point + runnerCancel() + } + } } // Stop the validator service. func (v *ValidatorService) Stop() error { v.cancel() log.Info("Stopping service") - if v.conn != nil { - return v.conn.GetGrpcClientConn().Close() - } return nil } diff --git a/validator/client/testutil/BUILD.bazel b/validator/client/testutil/BUILD.bazel index a8d0aef276..2ebc08e194 100644 --- a/validator/client/testutil/BUILD.bazel +++ b/validator/client/testutil/BUILD.bazel @@ -11,7 +11,6 @@ go_library( visibility = ["//validator:__subpackages__"], deps = [ "//api/client:go_default_library", - "//api/client/beacon/health:go_default_library", "//api/client/event:go_default_library", "//config/fieldparams:go_default_library", "//config/params:go_default_library", diff --git a/validator/client/testutil/mock_validator.go b/validator/client/testutil/mock_validator.go index 14435c0203..2c07c711b9 100644 --- a/validator/client/testutil/mock_validator.go +++ b/validator/client/testutil/mock_validator.go @@ -7,7 +7,6 @@ import ( "time" api "github.com/OffchainLabs/prysm/v6/api/client" - "github.com/OffchainLabs/prysm/v6/api/client/beacon/health" "github.com/OffchainLabs/prysm/v6/api/client/event" fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams" "github.com/OffchainLabs/prysm/v6/config/params" @@ -24,49 +23,49 @@ var _ iface.Validator = (*FakeValidator)(nil) // FakeValidator for mocking. type FakeValidator struct { - IsRegularDeadline bool + CanChangeHost bool LogValidatorGainsAndLossesCalled bool SaveProtectionsCalled bool DeleteProtectionCalled bool SlotDeadlineCalled bool HandleKeyReloadCalled bool - WaitForWalletInitializationCalled bool - SlasherReadyCalled bool - NextSlotCalled bool AttestToBlockHeadCalled bool + SlasherReadyCalled bool DoneCalled bool + RoleAtCalled bool + IsRegularDeadline bool ProposeBlockCalled bool UpdateProtectionsCalled bool UpdateDutiesCalled bool - RoleAtCalled bool - IndexToPubkeyMap map[uint64][fieldparams.BLSPubkeyLength]byte - PubkeyToIndexMap map[[fieldparams.BLSPubkeyLength]byte]uint64 - PubkeysToStatusesMap map[[fieldparams.BLSPubkeyLength]byte]ethpb.ValidatorStatus - ProposerSettingWait time.Duration - NextSlotRet <-chan primitives.Slot - UpdateDutiesArg1 uint64 - RoleAtArg1 uint64 - AttestToBlockHeadArg1 uint64 - ProposeBlockArg1 uint64 - RetryTillSuccess int - Balances map[[fieldparams.BLSPubkeyLength]byte]uint64 - CanonicalHeadSlotCalled int + WaitForWalletInitializationCalled bool + NextSlotCalled bool WaitForActivationCalled int + CanonicalHeadSlotCalled int WaitForSyncCalled int + RetryTillSuccess int + ProposeBlockArg1 uint64 + AttestToBlockHeadArg1 uint64 + RoleAtArg1 uint64 + UpdateDutiesArg1 uint64 + NextSlotRet <-chan primitives.Slot + ProposerSettingWait time.Duration + PubkeysToStatusesMap map[[fieldparams.BLSPubkeyLength]byte]ethpb.ValidatorStatus + PubkeyToIndexMap map[[fieldparams.BLSPubkeyLength]byte]uint64 + IndexToPubkeyMap map[uint64][fieldparams.BLSPubkeyLength]byte WaitForChainStartCalled int AttSubmitted chan interface{} BlockProposed chan interface{} AccountsChannel chan [][fieldparams.BLSPubkeyLength]byte - EventsChannel chan *event.Event GenesisT uint64 ReceiveBlocksCalled int proposerSettings *proposer.Settings - UpdateDutiesRet error + Balances map[[48]byte]uint64 + EventsChannel chan *event.Event ProposerSettingsErr error Km keymanager.IKeymanager graffiti string - Tracker health.Tracker PublicKey string + UpdateDutiesRet error RolesAtRet []iface.ValidatorRole } @@ -337,14 +336,10 @@ func (*FakeValidator) EventStreamIsRunning() bool { return true } -func (fv *FakeValidator) HealthTracker() health.Tracker { - return fv.Tracker -} - func (*FakeValidator) Host() string { return "127.0.0.1:0" } -func (fv *FakeValidator) ChangeHost() { - fv.Host() +func (fv *FakeValidator) FindHealthyHost(_ context.Context) bool { + return fv.CanChangeHost } diff --git a/validator/client/validator.go b/validator/client/validator.go index 247e260109..4fb3493d8a 100644 --- a/validator/client/validator.go +++ b/validator/client/validator.go @@ -16,7 +16,6 @@ import ( "time" "github.com/OffchainLabs/prysm/v6/api/client" - "github.com/OffchainLabs/prysm/v6/api/client/beacon/health" eventClient "github.com/OffchainLabs/prysm/v6/api/client/event" "github.com/OffchainLabs/prysm/v6/api/server/structs" "github.com/OffchainLabs/prysm/v6/async/event" @@ -1151,6 +1150,10 @@ func (v *validator) PushProposerSettings(ctx context.Context, slot primitives.Sl } func (v *validator) StartEventStream(ctx context.Context, topics []string) { + if v.EventStreamIsRunning() { + log.Debug("EventStream is already running") + return + } log.WithField("topics", topics).Info("Starting event stream") v.validatorClient.StartEventStream(ctx, topics, v.eventsChannel) } @@ -1242,25 +1245,41 @@ func (v *validator) EventStreamIsRunning() bool { return v.validatorClient.EventStreamIsRunning() } -func (v *validator) HealthTracker() health.Tracker { - return v.nodeClient.HealthTracker() -} - func (v *validator) Host() string { return v.validatorClient.Host() } -func (v *validator) ChangeHost() { - if len(v.beaconNodeHosts) == 1 { - log.Infof("Beacon node at %s is not responding, no backup node configured", v.Host()) - return - } +func (v *validator) changeHost() { next := (v.currentHostIndex + 1) % uint64(len(v.beaconNodeHosts)) - log.Infof("Beacon node at %s is not responding, switching to %s...", v.beaconNodeHosts[v.currentHostIndex], v.beaconNodeHosts[next]) + log.WithFields(logrus.Fields{ + "currentHost": v.beaconNodeHosts[v.currentHostIndex], + "nextHost": v.beaconNodeHosts[next], + }).Warn("Beacon node is not responding, switching host") v.validatorClient.SetHost(v.beaconNodeHosts[next]) v.currentHostIndex = next } +func (v *validator) FindHealthyHost(ctx context.Context) bool { + // Tail-recursive closure keeps retry count private. + var check func(remaining int) bool + check = func(remaining int) bool { + if v.nodeClient.IsHealthy(ctx) { // healthy → done + return true + } + if len(v.beaconNodeHosts) == 1 && features.Get().EnableBeaconRESTApi { + log.WithField("host", v.Host()).Warn("Beacon node is not responding, no backup node configured") + return false + } + if remaining == 0 || !features.Get().EnableBeaconRESTApi { + return false // exhausted or REST disabled + } + v.changeHost() + return check(remaining - 1) // recurse + } + + return check(len(v.beaconNodeHosts)) +} + func (v *validator) filterAndCacheActiveKeys(ctx context.Context, pubkeys [][fieldparams.BLSPubkeyLength]byte, slot primitives.Slot) ([][fieldparams.BLSPubkeyLength]byte, error) { ctx, span := trace.StartSpan(ctx, "validator.filterAndCacheActiveKeys") defer span.End() diff --git a/validator/client/validator_test.go b/validator/client/validator_test.go index d55db4389a..e81592b055 100644 --- a/validator/client/validator_test.go +++ b/validator/client/validator_test.go @@ -2838,9 +2838,9 @@ func TestValidator_ChangeHost(t *testing.T) { client.EXPECT().SetHost(v.beaconNodeHosts[1]) client.EXPECT().SetHost(v.beaconNodeHosts[0]) - v.ChangeHost() + v.changeHost() assert.Equal(t, uint64(1), v.currentHostIndex) - v.ChangeHost() + v.changeHost() assert.Equal(t, uint64(0), v.currentHostIndex) } diff --git a/validator/node/node.go b/validator/node/node.go index 1726494441..e5b34b1678 100644 --- a/validator/node/node.go +++ b/validator/node/node.go @@ -58,6 +58,7 @@ type ValidatorClient struct { wallet *wallet.Wallet walletInitializedFeed *event.Feed stop chan struct{} // Channel to wait for termination notifications. + once sync.Once } // NewValidatorClient creates a new instance of the Prysm validator client. @@ -161,13 +162,15 @@ func (c *ValidatorClient) Start() { // Close handles graceful shutdown of the system. func (c *ValidatorClient) Close() { - c.lock.Lock() - defer c.lock.Unlock() + c.once.Do(func() { // runs exactly one time + c.lock.Lock() + defer c.lock.Unlock() - c.services.StopAll() - log.Info("Stopping Prysm validator") - c.cancel() - close(c.stop) + c.services.StopAll() + log.Info("Stopping Prysm validator") + c.cancel() + close(c.stop) + }) } // checkLegacyDatabaseLocation checks is a database exists in the specified location. @@ -441,6 +444,8 @@ func (c *ValidatorClient) registerValidatorService(cliCtx *cli.Context) error { LogValidatorPerformance: !cliCtx.Bool(flags.DisablePenaltyRewardLogFlag.Name), EmitAccountMetrics: !cliCtx.Bool(flags.DisableAccountMetricsFlag.Name), Distributed: cliCtx.Bool(flags.EnableDistributed.Name), + CloseClientFunc: c.Close, + MaxHealthChecks: cliCtx.Int(flags.MaxHealthChecksFlag.Name), }) if err != nil { return errors.Wrap(err, "could not initialize validator service")