mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-04-19 03:01:06 -04:00
<!-- Thanks for sending a PR! Before submitting: 1. If this is your first PR, check out our contribution guide here https://docs.prylabs.network/docs/contribute/contribution-guidelines You will then need to sign our Contributor License Agreement (CLA), which will show up as a comment from a bot in this pull request after you open it. We cannot review code without a signed CLA. 2. Please file an associated tracking issue if this pull request is non-trivial and requires context for our team to understand. All features and most bug fixes should have an associated issue with a design discussed and decided upon. Small bug fixes and documentation improvements don't need issues. 3. New features and bug fixes must have tests. Documentation may need to be updated. If you're unsure what to update, send the PR, and we'll discuss in review. 4. Note that PRs updating dependencies and new Go versions are not accepted. Please file an issue instead. 5. A changelog entry is required for user facing issues. --> **What type of PR is this?** ## Summary This PR implements gRPC fallback support for the validator client, allowing it to automatically switch between multiple beacon node endpoints when the primary node becomes unavailable or unhealthy. ## Changes - Added `grpcConnectionProvider` to manage multiple gRPC connections with circular failover - Validator automatically detects unhealthy beacon nodes and switches to the next available endpoint - Health checks verify both node responsiveness AND sync status before accepting a node - Improved logging to only show "Found fully synced beacon node" when an actual switch occurs (reduces log noise) I removed the old middleware that uses gRPC's built in load balancer because: - gRPC's pick_first load balancer doesn't provide sync-status-aware failover - The validator needs to ensure it connects to a fully synced node, not just a reachable one ## Test Scenario ### Setup Deployed a 4-node Kurtosis testnet with local validator connecting to 2 beacon nodes: ```yaml # kurtosis-grpc-fallback-test.yaml participants: - el_type: nethermind cl_type: prysm validator_count: 128 # Keeps chain advancing - el_type: nethermind cl_type: prysm validator_count: 64 - el_type: nethermind cl_type: prysm validator_count: 64 # Keeps chain advancing - el_type: nethermind cl_type: prysm validator_count: 64 # Keeps chain advancing network_params: fulu_fork_epoch: 0 seconds_per_slot: 6 ``` Local validator started with: ```bash ./validator --beacon-rpc-provider=127.0.0.1:33005,127.0.0.1:33012 ... ``` ### Test 1: Primary Failover (cl-1 → cl-2) 1. Stopped cl-1 beacon node 2. Validator detected failure and switched to cl-2 **Logs:** ``` WARN Beacon node is not responding, switching host currentHost=127.0.0.1:33005 nextHost=127.0.0.1:33012 DEBUG Trying gRPC endpoint newHost=127.0.0.1:33012 previousHost=127.0.0.1:33005 INFO Failover succeeded: connected to healthy beacon node failedAttempts=[127.0.0.1:33005] newHost=127.0.0.1:33012 previousHost=127.0.0.1:33005 ``` **Result:** ✅ PASSED - Validator continued submitting attestations on cl-2 ### Test 2: Circular Failover (cl-2 → cl-1) 1. Restarted cl-1, stopped cl-2 2. Validator detected failure and switched back to cl-1 **Logs:** ``` WARN Beacon node is not responding, switching host currentHost=127.0.0.1:33012 nextHost=127.0.0.1:33005 DEBUG Trying gRPC endpoint newHost=127.0.0.1:33005 previousHost=127.0.0.1:33012 INFO Failover succeeded: connected to healthy beacon node failedAttempts=[127.0.0.1:33012] newHost=127.0.0.1:33005 previousHost=127.0.0.1:33012 ``` **Result:** ✅ PASSED - Circular fallback works correctly ## Key Log Messages | Log Level | Message | Source | |-----------|---------|--------| | WARN | "Beacon node is not responding, switching host" | `changeHost()` in validator.go | | INFO | "Switched gRPC endpoint" | `SetHost()` in grpc_connection_provider.go | | INFO | "Found fully synced beacon node" | `FindHealthyHost()` in validator.go (only on actual switch) | ## Test Plan - [x] Verify primary failover (cl-1 → cl-2) - [x] Verify circular failover (cl-2 → cl-1) - [x] Verify validator continues producing attestations after switch - [x] Verify "Found fully synced beacon node" only logs on actual switch (not every health check) **What does this PR do? Why is it needed?** **Which issues(s) does this PR fix?** Fixes # https://github.com/OffchainLabs/prysm/pull/7133 **Other notes for review** **Acknowledgements** - [x] I have read [CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md). - [x] I have included a uniquely named [changelog fragment file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd). - [x] I have added a description with sufficient context for reviewers to understand this PR. - [x] I have tested that my changes work as expected and I added a testing plan to the PR description (if applicable). --------- Co-authored-by: factory-droid[bot] <138933559+factory-droid[bot]@users.noreply.github.com> Co-authored-by: Radosław Kapka <rkapka@wp.pl> Co-authored-by: Manu NALEPA <enalepa@offchainlabs.com>
208 lines
6.6 KiB
Go
208 lines
6.6 KiB
Go
package grpc
|
|
|
|
import (
|
|
"context"
|
|
"net"
|
|
"reflect"
|
|
"strings"
|
|
"testing"
|
|
|
|
"github.com/OffchainLabs/prysm/v7/testing/assert"
|
|
"github.com/OffchainLabs/prysm/v7/testing/require"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/credentials/insecure"
|
|
)
|
|
|
|
func TestParseEndpoints(t *testing.T) {
|
|
tests := []struct {
|
|
name string
|
|
input string
|
|
expected []string
|
|
}{
|
|
{"single endpoint", "localhost:4000", []string{"localhost:4000"}},
|
|
{"multiple endpoints", "host1:4000,host2:4000,host3:4000", []string{"host1:4000", "host2:4000", "host3:4000"}},
|
|
{"endpoints with spaces", "host1:4000, host2:4000 , host3:4000", []string{"host1:4000", "host2:4000", "host3:4000"}},
|
|
{"empty string", "", nil},
|
|
{"only commas", ",,,", []string{}},
|
|
{"trailing comma", "host1:4000,host2:4000,", []string{"host1:4000", "host2:4000"}},
|
|
{"leading comma", ",host1:4000,host2:4000", []string{"host1:4000", "host2:4000"}},
|
|
}
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
got := parseEndpoints(tt.input)
|
|
if !reflect.DeepEqual(tt.expected, got) {
|
|
t.Errorf("parseEndpoints(%q) = %v, want %v", tt.input, got, tt.expected)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestNewGrpcConnectionProvider_Errors(t *testing.T) {
|
|
t.Run("no endpoints", func(t *testing.T) {
|
|
dialOpts := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
|
|
_, err := NewGrpcConnectionProvider(context.Background(), "", dialOpts)
|
|
require.ErrorContains(t, "no gRPC endpoints provided", err)
|
|
})
|
|
}
|
|
|
|
func TestGrpcConnectionProvider_LazyConnection(t *testing.T) {
|
|
// Start only one server but configure provider with two endpoints
|
|
lis, err := net.Listen("tcp", "127.0.0.1:0")
|
|
require.NoError(t, err)
|
|
server := grpc.NewServer()
|
|
go func() { _ = server.Serve(lis) }()
|
|
defer server.Stop()
|
|
|
|
validAddr := lis.Addr().String()
|
|
invalidAddr := "127.0.0.1:1" // Port 1 is unlikely to be listening
|
|
|
|
// Provider should succeed even though second endpoint is invalid (lazy connections)
|
|
endpoint := validAddr + "," + invalidAddr
|
|
ctx := context.Background()
|
|
dialOpts := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
|
|
provider, err := NewGrpcConnectionProvider(ctx, endpoint, dialOpts)
|
|
require.NoError(t, err, "Provider creation should succeed with lazy connections")
|
|
defer func() { provider.Close() }()
|
|
|
|
// First endpoint should work
|
|
conn := provider.CurrentConn()
|
|
assert.NotNil(t, conn, "First connection should be created lazily")
|
|
}
|
|
|
|
func TestGrpcConnectionProvider_SingleConnectionModel(t *testing.T) {
|
|
// Create provider with 3 endpoints
|
|
var addrs []string
|
|
var servers []*grpc.Server
|
|
|
|
for range 3 {
|
|
lis, err := net.Listen("tcp", "127.0.0.1:0")
|
|
require.NoError(t, err)
|
|
server := grpc.NewServer()
|
|
go func() { _ = server.Serve(lis) }()
|
|
addrs = append(addrs, lis.Addr().String())
|
|
servers = append(servers, server)
|
|
}
|
|
defer func() {
|
|
for _, s := range servers {
|
|
s.Stop()
|
|
}
|
|
}()
|
|
|
|
endpoint := strings.Join(addrs, ",")
|
|
ctx := context.Background()
|
|
dialOpts := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
|
|
provider, err := NewGrpcConnectionProvider(ctx, endpoint, dialOpts)
|
|
require.NoError(t, err)
|
|
defer func() { provider.Close() }()
|
|
|
|
// Access the internal state to verify single connection behavior
|
|
p := provider.(*grpcConnectionProvider)
|
|
|
|
// Initially no connection
|
|
p.mu.Lock()
|
|
assert.Equal(t, (*grpc.ClientConn)(nil), p.conn, "Connection should be nil before access")
|
|
p.mu.Unlock()
|
|
|
|
// Access connection - should create one
|
|
conn0 := provider.CurrentConn()
|
|
assert.NotNil(t, conn0)
|
|
|
|
p.mu.Lock()
|
|
assert.NotNil(t, p.conn, "Connection should be created after CurrentConn()")
|
|
firstConn := p.conn
|
|
p.mu.Unlock()
|
|
|
|
// Call CurrentConn again - should return same connection
|
|
conn0Again := provider.CurrentConn()
|
|
assert.Equal(t, conn0, conn0Again, "Should return same connection")
|
|
|
|
// Switch to different host - old connection should be closed, new one created lazily
|
|
require.NoError(t, provider.SwitchHost(1))
|
|
|
|
p.mu.Lock()
|
|
assert.Equal(t, (*grpc.ClientConn)(nil), p.conn, "Connection should be nil after SwitchHost (lazy)")
|
|
p.mu.Unlock()
|
|
|
|
// Get new connection
|
|
conn1 := provider.CurrentConn()
|
|
assert.NotNil(t, conn1)
|
|
assert.NotEqual(t, firstConn, conn1, "Should be a different connection after switching hosts")
|
|
}
|
|
|
|
// testProvider creates a provider with n test servers and returns cleanup function.
|
|
func testProvider(t *testing.T, n int) (GrpcConnectionProvider, []string, func()) {
|
|
var addrs []string
|
|
var cleanups []func()
|
|
|
|
for range n {
|
|
lis, err := net.Listen("tcp", "127.0.0.1:0")
|
|
require.NoError(t, err)
|
|
server := grpc.NewServer()
|
|
go func() { _ = server.Serve(lis) }()
|
|
addrs = append(addrs, lis.Addr().String())
|
|
cleanups = append(cleanups, server.Stop)
|
|
}
|
|
|
|
endpoint := strings.Join(addrs, ",")
|
|
|
|
ctx := context.Background()
|
|
dialOpts := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
|
|
provider, err := NewGrpcConnectionProvider(ctx, endpoint, dialOpts)
|
|
require.NoError(t, err)
|
|
|
|
cleanup := func() {
|
|
provider.Close()
|
|
for _, c := range cleanups {
|
|
c()
|
|
}
|
|
}
|
|
return provider, addrs, cleanup
|
|
}
|
|
|
|
func TestGrpcConnectionProvider(t *testing.T) {
|
|
provider, addrs, cleanup := testProvider(t, 3)
|
|
defer cleanup()
|
|
|
|
t.Run("initial state", func(t *testing.T) {
|
|
assert.Equal(t, 3, len(provider.Hosts()))
|
|
assert.Equal(t, addrs[0], provider.CurrentHost())
|
|
assert.NotNil(t, provider.CurrentConn())
|
|
})
|
|
|
|
t.Run("SwitchHost", func(t *testing.T) {
|
|
require.NoError(t, provider.SwitchHost(1))
|
|
assert.Equal(t, addrs[1], provider.CurrentHost())
|
|
assert.NotNil(t, provider.CurrentConn()) // New connection created lazily
|
|
require.NoError(t, provider.SwitchHost(0))
|
|
assert.Equal(t, addrs[0], provider.CurrentHost())
|
|
require.ErrorContains(t, "invalid host index", provider.SwitchHost(-1))
|
|
require.ErrorContains(t, "invalid host index", provider.SwitchHost(3))
|
|
})
|
|
|
|
t.Run("SwitchHost circular", func(t *testing.T) {
|
|
// Test round-robin style switching using SwitchHost with manual index
|
|
indices := []int{1, 2, 0, 1} // Simulate circular switching
|
|
for i, idx := range indices {
|
|
require.NoError(t, provider.SwitchHost(idx))
|
|
assert.Equal(t, addrs[idx], provider.CurrentHost(), "iteration %d", i)
|
|
}
|
|
})
|
|
|
|
t.Run("Hosts returns copy", func(t *testing.T) {
|
|
hosts := provider.Hosts()
|
|
original := hosts[0]
|
|
hosts[0] = "modified"
|
|
assert.Equal(t, original, provider.Hosts()[0])
|
|
})
|
|
}
|
|
|
|
func TestGrpcConnectionProvider_Close(t *testing.T) {
|
|
provider, _, cleanup := testProvider(t, 1)
|
|
defer cleanup()
|
|
|
|
assert.NotNil(t, provider.CurrentConn())
|
|
provider.Close()
|
|
assert.Equal(t, (*grpc.ClientConn)(nil), provider.CurrentConn())
|
|
provider.Close() // Double close is safe
|
|
}
|