Files
prysm/api/rest/rest_handler.go
james-prysm 641d90990d grpc fallback improvements (#16215)
<!-- Thanks for sending a PR! Before submitting:

1. If this is your first PR, check out our contribution guide here
https://docs.prylabs.network/docs/contribute/contribution-guidelines
You will then need to sign our Contributor License Agreement (CLA),
which will show up as a comment from a bot in this pull request after
you open it. We cannot review code without a signed CLA.
2. Please file an associated tracking issue if this pull request is
non-trivial and requires context for our team to understand. All
features and most bug fixes should have
an associated issue with a design discussed and decided upon. Small bug
   fixes and documentation improvements don't need issues.
3. New features and bug fixes must have tests. Documentation may need to
be updated. If you're unsure what to update, send the PR, and we'll
discuss
   in review.
4. Note that PRs updating dependencies and new Go versions are not
accepted.
   Please file an issue instead.
5. A changelog entry is required for user facing issues.
-->

**What type of PR is this?**

## Summary

This PR implements gRPC fallback support for the validator client,
allowing it to automatically switch between multiple beacon node
endpoints when the primary node becomes unavailable or unhealthy.

## Changes

- Added `grpcConnectionProvider` to manage multiple gRPC connections
with circular failover
- Validator automatically detects unhealthy beacon nodes and switches to
the next available endpoint
- Health checks verify both node responsiveness AND sync status before
accepting a node
- Improved logging to only show "Found fully synced beacon node" when an
actual switch occurs (reduces log noise)


I removed the old middleware that uses gRPC's built in load balancer
because:

- gRPC's pick_first load balancer doesn't provide sync-status-aware
failover
- The validator needs to ensure it connects to a fully synced node, not
just a reachable one

## Test Scenario

### Setup
Deployed a 4-node Kurtosis testnet with local validator connecting to 2
beacon nodes:

```yaml
# kurtosis-grpc-fallback-test.yaml
participants:
  - el_type: nethermind
    cl_type: prysm
    validator_count: 128  # Keeps chain advancing
  - el_type: nethermind
    cl_type: prysm
    validator_count: 64
  - el_type: nethermind
    cl_type: prysm
    validator_count: 64   # Keeps chain advancing
  - el_type: nethermind
    cl_type: prysm
    validator_count: 64   # Keeps chain advancing

network_params:
  fulu_fork_epoch: 0
  seconds_per_slot: 6
```

Local validator started with:
```bash
./validator --beacon-rpc-provider=127.0.0.1:33005,127.0.0.1:33012 ...
```

### Test 1: Primary Failover (cl-1 → cl-2)

1. Stopped cl-1 beacon node
2. Validator detected failure and switched to cl-2

**Logs:**
```
WARN  Beacon node is not responding, switching host currentHost=127.0.0.1:33005 nextHost=127.0.0.1:33012
DEBUG Trying gRPC endpoint newHost=127.0.0.1:33012 previousHost=127.0.0.1:33005
INFO  Failover succeeded: connected to healthy beacon node failedAttempts=[127.0.0.1:33005] newHost=127.0.0.1:33012 previousHost=127.0.0.1:33005
```

**Result:**  PASSED - Validator continued submitting attestations on
cl-2

### Test 2: Circular Failover (cl-2 → cl-1)

1. Restarted cl-1, stopped cl-2
2. Validator detected failure and switched back to cl-1

**Logs:**
```
WARN  Beacon node is not responding, switching host currentHost=127.0.0.1:33012 nextHost=127.0.0.1:33005
DEBUG Trying gRPC endpoint newHost=127.0.0.1:33005 previousHost=127.0.0.1:33012
INFO  Failover succeeded: connected to healthy beacon node failedAttempts=[127.0.0.1:33012] newHost=127.0.0.1:33005 previousHost=127.0.0.1:33012
```

**Result:**  PASSED - Circular fallback works correctly

## Key Log Messages

| Log Level | Message | Source |
|-----------|---------|--------|
| WARN | "Beacon node is not responding, switching host" |
`changeHost()` in validator.go |
| INFO | "Switched gRPC endpoint" | `SetHost()` in
grpc_connection_provider.go |
| INFO | "Found fully synced beacon node" | `FindHealthyHost()` in
validator.go (only on actual switch) |

## Test Plan

- [x] Verify primary failover (cl-1 → cl-2)
- [x] Verify circular failover (cl-2 → cl-1)
- [x] Verify validator continues producing attestations after switch
- [x] Verify "Found fully synced beacon node" only logs on actual switch
(not every health check)

**What does this PR do? Why is it needed?**

**Which issues(s) does this PR fix?**

Fixes # https://github.com/OffchainLabs/prysm/pull/7133


**Other notes for review**

**Acknowledgements**

- [x] I have read
[CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md).
- [x] I have included a uniquely named [changelog fragment
file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd).
- [x] I have added a description with sufficient context for reviewers
to understand this PR.
- [x] I have tested that my changes work as expected and I added a
testing plan to the PR description (if applicable).

---------

Co-authored-by: factory-droid[bot] <138933559+factory-droid[bot]@users.noreply.github.com>
Co-authored-by: Radosław Kapka <rkapka@wp.pl>
Co-authored-by: Manu NALEPA <enalepa@offchainlabs.com>
2026-02-02 14:51:56 +00:00

317 lines
10 KiB
Go

package rest
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"strings"
"github.com/OffchainLabs/prysm/v7/api"
"github.com/OffchainLabs/prysm/v7/api/apiutil"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/network/httputil"
"github.com/OffchainLabs/prysm/v7/runtime/version"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
type reqOption func(*http.Request)
// RestHandler defines the interface for making REST API requests.
type RestHandler interface {
Get(ctx context.Context, endpoint string, resp any) error
GetStatusCode(ctx context.Context, endpoint string) (int, error)
GetSSZ(ctx context.Context, endpoint string) ([]byte, http.Header, error)
Post(ctx context.Context, endpoint string, headers map[string]string, data *bytes.Buffer, resp any) error
PostSSZ(ctx context.Context, endpoint string, headers map[string]string, data *bytes.Buffer) ([]byte, http.Header, error)
HttpClient() *http.Client
Host() string
SwitchHost(host string)
}
type restHandler struct {
client http.Client
host string
reqOverrides []reqOption
}
// newRestHandler returns a RestHandler (internal use)
func newRestHandler(client http.Client, host string) RestHandler {
return NewRestHandler(client, host)
}
// NewRestHandler returns a RestHandler
func NewRestHandler(client http.Client, host string) RestHandler {
rh := &restHandler{
client: client,
host: host,
}
rh.appendAcceptOverride()
return rh
}
// appendAcceptOverride enables the Accept header to be customized at runtime via an environment variable.
// This is specified as an env var because it is a niche option that prysm may use for performance testing or debugging
// bug which users are unlikely to need. Using an env var keeps the set of user-facing flags cleaner.
func (c *restHandler) appendAcceptOverride() {
if accept := os.Getenv(params.EnvNameOverrideAccept); accept != "" {
c.reqOverrides = append(c.reqOverrides, func(req *http.Request) {
req.Header.Set("Accept", accept)
})
}
}
// HttpClient returns the underlying HTTP client of the handler
func (c *restHandler) HttpClient() *http.Client {
return &c.client
}
// Host returns the underlying HTTP host
func (c *restHandler) Host() string {
return c.host
}
// Get sends a GET request and decodes the response body as a JSON object into the passed in object.
// If an HTTP error is returned, the body is decoded as a DefaultJsonError JSON object and returned as the first return value.
func (c *restHandler) Get(ctx context.Context, endpoint string, resp any) error {
url := c.host + endpoint
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return errors.Wrapf(err, "failed to create request for endpoint %s", url)
}
req.Header.Set("User-Agent", version.BuildData())
httpResp, err := c.client.Do(req)
if err != nil {
return errors.Wrapf(err, "failed to perform request for endpoint %s", url)
}
defer func() {
if err := httpResp.Body.Close(); err != nil {
return
}
}()
return decodeResp(httpResp, resp)
}
// GetStatusCode sends a GET request and returns only the HTTP status code.
// This is useful for endpoints like /eth/v1/node/health that communicate status via HTTP codes
// (200 = ready, 206 = syncing, 503 = unavailable) rather than response bodies.
func (c *restHandler) GetStatusCode(ctx context.Context, endpoint string) (int, error) {
url := c.host + endpoint
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return 0, errors.Wrapf(err, "failed to create request for endpoint %s", url)
}
req.Header.Set("User-Agent", version.BuildData())
httpResp, err := c.client.Do(req)
if err != nil {
return 0, errors.Wrapf(err, "failed to perform request for endpoint %s", url)
}
defer func() {
if err := httpResp.Body.Close(); err != nil {
return
}
}()
return httpResp.StatusCode, nil
}
func (c *restHandler) GetSSZ(ctx context.Context, endpoint string) ([]byte, http.Header, error) {
url := c.host + endpoint
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, nil, errors.Wrapf(err, "failed to create request for endpoint %s", url)
}
primaryAcceptType := fmt.Sprintf("%s;q=%s", api.OctetStreamMediaType, "0.95")
secondaryAcceptType := fmt.Sprintf("%s;q=%s", api.JsonMediaType, "0.9")
acceptHeaderString := fmt.Sprintf("%s,%s", primaryAcceptType, secondaryAcceptType)
req.Header.Set("Accept", acceptHeaderString)
for _, o := range c.reqOverrides {
o(req)
}
req.Header.Set("User-Agent", version.BuildData())
httpResp, err := c.client.Do(req)
if err != nil {
return nil, nil, errors.Wrapf(err, "failed to perform request for endpoint %s", url)
}
defer func() {
if err := httpResp.Body.Close(); err != nil {
return
}
}()
accept := req.Header.Get("Accept")
contentType := httpResp.Header.Get("Content-Type")
body, err := io.ReadAll(httpResp.Body)
if err != nil {
return nil, nil, errors.Wrapf(err, "failed to read response body for %s", httpResp.Request.URL)
}
if !apiutil.PrimaryAcceptMatches(accept, contentType) {
log.WithFields(logrus.Fields{
"Accept": accept,
"Content-Type": contentType,
}).Debug("Server responded with non primary accept type")
}
// non-2XX codes are a failure
if !strings.HasPrefix(httpResp.Status, "2") {
decoder := json.NewDecoder(bytes.NewBuffer(body))
errorJson := &httputil.DefaultJsonError{}
if err = decoder.Decode(errorJson); err != nil {
return nil, nil, fmt.Errorf("HTTP request for %s unsuccessful (%d: %s)", httpResp.Request.URL, httpResp.StatusCode, string(body))
}
return nil, nil, errorJson
}
return body, httpResp.Header, nil
}
// Post sends a POST request and decodes the response body as a JSON object into the passed in object.
// If an HTTP error is returned, the body is decoded as a DefaultJsonError JSON object and returned as the first return value.
func (c *restHandler) Post(
ctx context.Context,
apiEndpoint string,
headers map[string]string,
data *bytes.Buffer,
resp any,
) error {
if data == nil {
return errors.New("data is nil")
}
url := c.host + apiEndpoint
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, data)
if err != nil {
return errors.Wrapf(err, "failed to create request for endpoint %s", url)
}
for headerKey, headerValue := range headers {
req.Header.Set(headerKey, headerValue)
}
req.Header.Set("Content-Type", api.JsonMediaType)
req.Header.Set("User-Agent", version.BuildData())
httpResp, err := c.client.Do(req)
if err != nil {
return errors.Wrapf(err, "failed to perform request for endpoint %s", url)
}
defer func() {
if err = httpResp.Body.Close(); err != nil {
return
}
}()
return decodeResp(httpResp, resp)
}
// PostSSZ sends a POST request and prefers an SSZ (application/octet-stream) response body.
func (c *restHandler) PostSSZ(
ctx context.Context,
apiEndpoint string,
headers map[string]string,
data *bytes.Buffer,
) ([]byte, http.Header, error) {
if data == nil {
return nil, nil, errors.New("data is nil")
}
url := c.host + apiEndpoint
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, data)
if err != nil {
return nil, nil, errors.Wrapf(err, "failed to create request for endpoint %s", url)
}
// Accept header: prefer octet-stream (SSZ), fall back to JSON
primaryAcceptType := fmt.Sprintf("%s;q=%s", api.OctetStreamMediaType, "0.95")
secondaryAcceptType := fmt.Sprintf("%s;q=%s", api.JsonMediaType, "0.9")
acceptHeaderString := fmt.Sprintf("%s,%s", primaryAcceptType, secondaryAcceptType)
req.Header.Set("Accept", acceptHeaderString)
// User-supplied headers
for headerKey, headerValue := range headers {
req.Header.Set(headerKey, headerValue)
}
for _, o := range c.reqOverrides {
o(req)
}
req.Header.Set("Content-Type", api.OctetStreamMediaType)
req.Header.Set("User-Agent", version.BuildData())
httpResp, err := c.client.Do(req)
if err != nil {
return nil, nil, errors.Wrapf(err, "failed to perform request for endpoint %s", url)
}
defer func() {
if err := httpResp.Body.Close(); err != nil {
return
}
}()
accept := req.Header.Get("Accept")
contentType := httpResp.Header.Get("Content-Type")
body, err := io.ReadAll(httpResp.Body)
if err != nil {
return nil, nil, errors.Wrapf(err, "failed to read response body for %s", httpResp.Request.URL)
}
if !apiutil.PrimaryAcceptMatches(accept, contentType) {
log.WithFields(logrus.Fields{
"Accept": accept,
"Content-Type": contentType,
}).Debug("Server responded with non primary accept type")
}
// non-2XX codes are a failure
if !strings.HasPrefix(httpResp.Status, "2") {
decoder := json.NewDecoder(bytes.NewBuffer(body))
errorJson := &httputil.DefaultJsonError{}
if err = decoder.Decode(errorJson); err != nil {
return nil, nil, fmt.Errorf("HTTP request for %s unsuccessful (%d: %s)", httpResp.Request.URL, httpResp.StatusCode, string(body))
}
return nil, nil, errorJson
}
return body, httpResp.Header, nil
}
func decodeResp(httpResp *http.Response, resp any) error {
body, err := io.ReadAll(httpResp.Body)
if err != nil {
return errors.Wrapf(err, "failed to read response body for %s", httpResp.Request.URL)
}
if !strings.Contains(httpResp.Header.Get("Content-Type"), api.JsonMediaType) {
// 2XX codes are a success
if strings.HasPrefix(httpResp.Status, "2") {
return nil
}
return &httputil.DefaultJsonError{Code: httpResp.StatusCode, Message: string(body)}
}
decoder := json.NewDecoder(bytes.NewBuffer(body))
// non-2XX codes are a failure
if !strings.HasPrefix(httpResp.Status, "2") {
errorJson := &httputil.DefaultJsonError{}
if err = decoder.Decode(errorJson); err != nil {
return errors.Wrapf(err, "failed to decode response body into error json for %s", httpResp.Request.URL)
}
return errorJson
}
// resp is nil for requests that do not return anything.
if resp != nil {
if err = decoder.Decode(resp); err != nil {
return errors.Wrapf(err, "failed to decode response body into json for %s", httpResp.Request.URL)
}
}
return nil
}
func (c *restHandler) SwitchHost(host string) {
c.host = host
}