mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-04-19 03:01:06 -04:00
**What type of PR is this?** Other **What does this PR do? Why is it needed?** Follow up to https://github.com/OffchainLabs/prysm/pull/16215 this pr improves logging, fixes stuttering in package naming, adds additional unit tests, and deduplicates fallback node code. **Which issues(s) does this PR fix?** fixes a potential race if reconnecting to the same host very quickly which has a stale connection still. **Other notes for review** **Acknowledgements** - [x] I have read [CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md). - [x] I have included a uniquely named [changelog fragment file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd). - [x] I have added a description with sufficient context for reviewers to understand this PR. - [x] I have tested that my changes work as expected and I added a testing plan to the PR description (if applicable).
187 lines
4.9 KiB
Go
187 lines
4.9 KiB
Go
package grpc
|
|
|
|
import (
|
|
"context"
|
|
"strings"
|
|
"sync"
|
|
|
|
"github.com/pkg/errors"
|
|
"github.com/sirupsen/logrus"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
// GrpcConnectionProvider manages gRPC connections for failover support.
|
|
// It allows switching between different beacon node endpoints when the current one becomes unavailable.
|
|
// Only one connection is maintained at a time - when switching hosts, the old connection is closed.
|
|
type GrpcConnectionProvider interface {
|
|
// CurrentConn returns the currently active gRPC connection.
|
|
// The connection is created lazily on first call.
|
|
// Returns nil if the provider has been closed.
|
|
CurrentConn() *grpc.ClientConn
|
|
// CurrentHost returns the address of the currently active endpoint.
|
|
CurrentHost() string
|
|
// Hosts returns all configured endpoint addresses.
|
|
Hosts() []string
|
|
// SwitchHost switches to the endpoint at the given index.
|
|
// The new connection is created lazily on next CurrentConn() call.
|
|
SwitchHost(index int) error
|
|
// ConnectionCounter returns a monotonically increasing counter that increments
|
|
// each time SwitchHost changes the active endpoint. This allows consumers to
|
|
// detect connection changes even when the host string returns to a previous value
|
|
// (e.g., host0 → host1 → host0).
|
|
ConnectionCounter() uint64
|
|
// Close closes the current connection.
|
|
Close()
|
|
}
|
|
|
|
type grpcConnectionProvider struct {
|
|
// Immutable after construction - no lock needed for reads
|
|
endpoints []string
|
|
ctx context.Context
|
|
dialOpts []grpc.DialOption
|
|
|
|
// Current connection state (protected by mutex)
|
|
currentIndex uint64
|
|
conn *grpc.ClientConn
|
|
connCounter uint64
|
|
|
|
mu sync.Mutex
|
|
closed bool
|
|
}
|
|
|
|
// NewGrpcConnectionProvider creates a new connection provider that manages gRPC connections.
|
|
// The endpoint parameter can be a comma-separated list of addresses (e.g., "host1:4000,host2:4000").
|
|
// Only one connection is maintained at a time, created lazily on first use.
|
|
func NewGrpcConnectionProvider(
|
|
ctx context.Context,
|
|
endpoint string,
|
|
dialOpts []grpc.DialOption,
|
|
) (GrpcConnectionProvider, error) {
|
|
endpoints := parseEndpoints(endpoint)
|
|
if len(endpoints) == 0 {
|
|
return nil, errors.New("no gRPC endpoints provided")
|
|
}
|
|
|
|
log.WithFields(logrus.Fields{
|
|
"endpoints": endpoints,
|
|
"count": len(endpoints),
|
|
}).Info("Initialized gRPC connection provider")
|
|
|
|
return &grpcConnectionProvider{
|
|
endpoints: endpoints,
|
|
ctx: ctx,
|
|
dialOpts: dialOpts,
|
|
}, nil
|
|
}
|
|
|
|
// parseEndpoints splits a comma-separated endpoint string into individual endpoints.
|
|
func parseEndpoints(endpoint string) []string {
|
|
if endpoint == "" {
|
|
return nil
|
|
}
|
|
endpoints := make([]string, 0, 1)
|
|
for p := range strings.SplitSeq(endpoint, ",") {
|
|
if p = strings.TrimSpace(p); p != "" {
|
|
endpoints = append(endpoints, p)
|
|
}
|
|
}
|
|
return endpoints
|
|
}
|
|
|
|
func (p *grpcConnectionProvider) CurrentConn() *grpc.ClientConn {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
|
|
if p.closed {
|
|
return nil
|
|
}
|
|
|
|
// Return existing connection if available
|
|
if p.conn != nil {
|
|
return p.conn
|
|
}
|
|
|
|
// Create connection lazily
|
|
ep := p.endpoints[p.currentIndex]
|
|
conn, err := grpc.DialContext(p.ctx, ep, p.dialOpts...)
|
|
if err != nil {
|
|
log.WithError(err).WithField("endpoint", ep).Error("Failed to create gRPC connection")
|
|
return nil
|
|
}
|
|
|
|
p.conn = conn
|
|
log.WithField("endpoint", ep).Debug("Created gRPC connection")
|
|
return conn
|
|
}
|
|
|
|
func (p *grpcConnectionProvider) CurrentHost() string {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
return p.endpoints[p.currentIndex]
|
|
}
|
|
|
|
func (p *grpcConnectionProvider) Hosts() []string {
|
|
// Return a copy to maintain immutability
|
|
hosts := make([]string, len(p.endpoints))
|
|
copy(hosts, p.endpoints)
|
|
return hosts
|
|
}
|
|
|
|
func (p *grpcConnectionProvider) SwitchHost(index int) error {
|
|
if index < 0 || index >= len(p.endpoints) {
|
|
return errors.Errorf("invalid host index %d, must be between 0 and %d", index, len(p.endpoints)-1)
|
|
}
|
|
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
|
|
if uint64(index) == p.currentIndex {
|
|
return nil // Already on this host
|
|
}
|
|
|
|
oldHost := p.endpoints[p.currentIndex]
|
|
oldConn := p.conn
|
|
|
|
p.conn = nil // Clear immediately - new connection created lazily
|
|
p.currentIndex = uint64(index)
|
|
p.connCounter++
|
|
|
|
// Close old connection asynchronously to avoid blocking the caller
|
|
if oldConn != nil {
|
|
go func() {
|
|
if err := oldConn.Close(); err != nil {
|
|
log.WithError(err).WithField("endpoint", oldHost).Debug("Failed to close previous connection")
|
|
}
|
|
}()
|
|
}
|
|
|
|
log.WithFields(logrus.Fields{
|
|
"previousHost": oldHost,
|
|
"newHost": p.endpoints[index],
|
|
}).Debug("Switched gRPC endpoint")
|
|
return nil
|
|
}
|
|
|
|
func (p *grpcConnectionProvider) ConnectionCounter() uint64 {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
return p.connCounter
|
|
}
|
|
|
|
func (p *grpcConnectionProvider) Close() {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
|
|
if p.closed {
|
|
return
|
|
}
|
|
p.closed = true
|
|
|
|
if p.conn != nil {
|
|
if err := p.conn.Close(); err != nil {
|
|
log.WithError(err).WithField("endpoint", p.endpoints[p.currentIndex]).Debug("Failed to close gRPC connection")
|
|
}
|
|
p.conn = nil
|
|
}
|
|
}
|