having gRPC map more closely to rest implementation as well as only providing sync access to synced nodes

This commit is contained in:
james-prysm
2026-01-05 16:08:45 -06:00
parent 888db581dd
commit fc2dcb0e88
4 changed files with 88 additions and 50 deletions

View File

@@ -34,9 +34,20 @@ func (c *grpcNodeClient) Peers(ctx context.Context, in *empty.Empty) (*ethpb.Pee
}
func (c *grpcNodeClient) IsHealthy(ctx context.Context) bool {
// First check if the node is available via health endpoint
_, err := c.getClient().GetHealth(ctx, &ethpb.HealthRequest{})
if err != nil {
log.WithError(err).Error("Failed to get health of node")
log.WithError(err).Debug("Failed to get health of node")
return false
}
// Then check sync status - we only want fully synced nodes
syncStatus, err := c.getClient().GetSyncStatus(ctx, &empty.Empty{})
if err != nil {
log.WithError(err).Debug("Failed to get sync status of node")
return false
}
if syncStatus.Syncing {
log.Debug("Node is syncing, not fully synced")
return false
}
return true

View File

@@ -385,14 +385,22 @@ func (c *grpcValidatorClient) Host() string {
return c.grpcClientManager.conn.GetGrpcConnectionProvider().CurrentHost()
}
func (c *grpcValidatorClient) SetHost(_ string) {
// For gRPC, host switching is done via the connection provider's NextHost() method,
// not by setting a specific host string. The validator's changeHost() logic
// uses the connection provider directly.
if c.grpcClientManager == nil || c.grpcClientManager.conn == nil || c.grpcClientManager.conn.GetGrpcConnectionProvider() == nil {
log.Debug("SetHost called but no gRPC connection provider configured")
func (c *grpcValidatorClient) SetHost(host string) {
if c.grpcClientManager == nil || c.grpcClientManager.conn == nil {
return
}
// The actual host switching is managed by the connection provider
// which is called from FindHealthyHost/changeHost in validator.go
provider := c.grpcClientManager.conn.GetGrpcConnectionProvider()
if provider == nil {
return
}
// Find the index of the requested host and switch to it
for i, h := range provider.Hosts() {
if h == host {
if err := provider.SetHost(i); err != nil {
log.WithError(err).WithField("host", host).Error("Failed to set gRPC host")
}
return
}
}
log.WithField("host", host).Warn("Requested gRPC host not found in configured endpoints")
}

View File

@@ -134,9 +134,16 @@ func NewValidatorService(ctx context.Context, cfg *Config) (*ValidatorService, e
s.ctx = grpcutil.AppendHeaders(ctx, cfg.GRPCHeaders)
grpcProvider, err := validatorHelpers.NewGrpcConnectionProvider(ctx, cfg.BeaconNodeGRPCEndpoint, dialOpts)
if err != nil {
return s, errors.Wrap(err, "failed to create gRPC connection provider")
var grpcConn *grpc.ClientConn
var grpcProvider validatorHelpers.GrpcConnectionProvider
if cfg.BeaconNodeGRPCEndpoint != "" {
var err error
grpcProvider, err = validatorHelpers.NewGrpcConnectionProvider(ctx, cfg.BeaconNodeGRPCEndpoint, dialOpts)
if err != nil {
return s, errors.Wrap(err, "failed to create gRPC connection provider")
}
grpcConn = grpcProvider.CurrentConn()
}
if cfg.BeaconNodeCert != "" {
@@ -146,11 +153,13 @@ func NewValidatorService(ctx context.Context, cfg *Config) (*ValidatorService, e
connOpts := []validatorHelpers.NodeConnectionOption{
validatorHelpers.WithBeaconApiHeaders(cfg.BeaconApiHeaders),
validatorHelpers.WithBeaconApiTimeout(cfg.BeaconApiTimeout),
validatorHelpers.WithGrpcConnectionProvider(grpcProvider),
}
if grpcProvider != nil {
connOpts = append(connOpts, validatorHelpers.WithGrpcConnectionProvider(grpcProvider))
}
s.conn = validatorHelpers.NewNodeConnection(
grpcProvider.CurrentConn(),
grpcConn,
cfg.BeaconApiEndpoint,
connOpts...,
)

View File

@@ -1263,53 +1263,63 @@ func (v *validator) Host() string {
}
func (v *validator) changeHost() {
if features.Get().EnableBeaconRESTApi {
// REST API mode: use the beaconNodeHosts array and validatorClient.SetHost
next := (v.currentHostIndex + 1) % uint64(len(v.beaconNodeHosts))
log.WithFields(logrus.Fields{
"currentHost": v.beaconNodeHosts[v.currentHostIndex],
"nextHost": v.beaconNodeHosts[next],
}).Warn("Beacon node is not responding, switching host")
v.validatorClient.SetHost(v.beaconNodeHosts[next])
v.currentHostIndex = next
} else if v.grpcConnectionProvider != nil {
// gRPC mode with multi-endpoint support: use the connection provider
v.grpcConnectionProvider.NextHost()
hosts := v.hosts()
if len(hosts) <= 1 {
return
}
next := (v.currentHostIndex + 1) % uint64(len(hosts))
log.WithFields(logrus.Fields{
"currentHost": hosts[v.currentHostIndex],
"nextHost": hosts[next],
}).Warn("Beacon node is not responding, switching host")
v.validatorClient.SetHost(hosts[next])
v.currentHostIndex = next
}
// hosts returns the list of configured beacon node hosts for failover.
func (v *validator) hosts() []string {
if features.Get().EnableBeaconRESTApi {
return v.beaconNodeHosts
}
if v.grpcConnectionProvider != nil {
return v.grpcConnectionProvider.Hosts()
}
return nil
}
// numHosts returns the number of configured beacon node hosts for failover.
func (v *validator) numHosts() int {
if features.Get().EnableBeaconRESTApi {
return len(v.beaconNodeHosts)
}
if v.grpcConnectionProvider != nil {
return len(v.grpcConnectionProvider.Hosts())
}
return 1 // Single endpoint, no failover
return len(v.hosts())
}
func (v *validator) FindHealthyHost(ctx context.Context) bool {
numHosts := v.numHosts()
hosts := v.hosts()
numHosts := len(hosts)
// Tail-recursive closure keeps retry count private.
var check func(remaining int) bool
check = func(remaining int) bool {
if v.nodeClient.IsHealthy(ctx) { // healthy → done
return true
}
if numHosts == 1 {
log.WithField("host", v.Host()).Warn("Beacon node is not responding, no backup node configured")
return false
}
if remaining == 0 {
return false // exhausted all hosts
}
v.changeHost()
return check(remaining - 1) // recurse
if numHosts == 0 {
return false
}
return check(numHosts)
// Check all hosts for a fully synced node
for i := 0; i < numHosts; i++ {
if v.nodeClient.IsHealthy(ctx) {
log.WithField("host", v.Host()).Debug("Found healthy beacon node")
return true
}
log.WithField("host", v.Host()).Debug("Beacon node not fully synced")
// Try next host if not the last iteration
if i < numHosts-1 {
v.changeHost()
}
}
if numHosts == 1 {
log.WithField("host", v.Host()).Warn("Beacon node is not fully synced, no backup node configured")
} else {
log.Warn("No fully synced beacon node found")
}
return false
}
func (v *validator) filterAndCacheActiveKeys(ctx context.Context, pubkeys [][fieldparams.BLSPubkeyLength]byte, slot primitives.Slot) ([][fieldparams.BLSPubkeyLength]byte, error) {