mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-04-19 03:01:06 -04:00
<!-- Thanks for sending a PR! Before submitting: 1. If this is your first PR, check out our contribution guide here https://docs.prylabs.network/docs/contribute/contribution-guidelines You will then need to sign our Contributor License Agreement (CLA), which will show up as a comment from a bot in this pull request after you open it. We cannot review code without a signed CLA. 2. Please file an associated tracking issue if this pull request is non-trivial and requires context for our team to understand. All features and most bug fixes should have an associated issue with a design discussed and decided upon. Small bug fixes and documentation improvements don't need issues. 3. New features and bug fixes must have tests. Documentation may need to be updated. If you're unsure what to update, send the PR, and we'll discuss in review. 4. Note that PRs updating dependencies and new Go versions are not accepted. Please file an issue instead. 5. A changelog entry is required for user facing issues. --> **What type of PR is this?** ## Summary This PR implements gRPC fallback support for the validator client, allowing it to automatically switch between multiple beacon node endpoints when the primary node becomes unavailable or unhealthy. ## Changes - Added `grpcConnectionProvider` to manage multiple gRPC connections with circular failover - Validator automatically detects unhealthy beacon nodes and switches to the next available endpoint - Health checks verify both node responsiveness AND sync status before accepting a node - Improved logging to only show "Found fully synced beacon node" when an actual switch occurs (reduces log noise) I removed the old middleware that uses gRPC's built in load balancer because: - gRPC's pick_first load balancer doesn't provide sync-status-aware failover - The validator needs to ensure it connects to a fully synced node, not just a reachable one ## Test Scenario ### Setup Deployed a 4-node Kurtosis testnet with local validator connecting to 2 beacon nodes: ```yaml # kurtosis-grpc-fallback-test.yaml participants: - el_type: nethermind cl_type: prysm validator_count: 128 # Keeps chain advancing - el_type: nethermind cl_type: prysm validator_count: 64 - el_type: nethermind cl_type: prysm validator_count: 64 # Keeps chain advancing - el_type: nethermind cl_type: prysm validator_count: 64 # Keeps chain advancing network_params: fulu_fork_epoch: 0 seconds_per_slot: 6 ``` Local validator started with: ```bash ./validator --beacon-rpc-provider=127.0.0.1:33005,127.0.0.1:33012 ... ``` ### Test 1: Primary Failover (cl-1 → cl-2) 1. Stopped cl-1 beacon node 2. Validator detected failure and switched to cl-2 **Logs:** ``` WARN Beacon node is not responding, switching host currentHost=127.0.0.1:33005 nextHost=127.0.0.1:33012 DEBUG Trying gRPC endpoint newHost=127.0.0.1:33012 previousHost=127.0.0.1:33005 INFO Failover succeeded: connected to healthy beacon node failedAttempts=[127.0.0.1:33005] newHost=127.0.0.1:33012 previousHost=127.0.0.1:33005 ``` **Result:** ✅ PASSED - Validator continued submitting attestations on cl-2 ### Test 2: Circular Failover (cl-2 → cl-1) 1. Restarted cl-1, stopped cl-2 2. Validator detected failure and switched back to cl-1 **Logs:** ``` WARN Beacon node is not responding, switching host currentHost=127.0.0.1:33012 nextHost=127.0.0.1:33005 DEBUG Trying gRPC endpoint newHost=127.0.0.1:33005 previousHost=127.0.0.1:33012 INFO Failover succeeded: connected to healthy beacon node failedAttempts=[127.0.0.1:33012] newHost=127.0.0.1:33005 previousHost=127.0.0.1:33012 ``` **Result:** ✅ PASSED - Circular fallback works correctly ## Key Log Messages | Log Level | Message | Source | |-----------|---------|--------| | WARN | "Beacon node is not responding, switching host" | `changeHost()` in validator.go | | INFO | "Switched gRPC endpoint" | `SetHost()` in grpc_connection_provider.go | | INFO | "Found fully synced beacon node" | `FindHealthyHost()` in validator.go (only on actual switch) | ## Test Plan - [x] Verify primary failover (cl-1 → cl-2) - [x] Verify circular failover (cl-2 → cl-1) - [x] Verify validator continues producing attestations after switch - [x] Verify "Found fully synced beacon node" only logs on actual switch (not every health check) **What does this PR do? Why is it needed?** **Which issues(s) does this PR fix?** Fixes # https://github.com/OffchainLabs/prysm/pull/7133 **Other notes for review** **Acknowledgements** - [x] I have read [CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md). - [x] I have included a uniquely named [changelog fragment file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd). - [x] I have added a description with sufficient context for reviewers to understand this PR. - [x] I have tested that my changes work as expected and I added a testing plan to the PR description (if applicable). --------- Co-authored-by: factory-droid[bot] <138933559+factory-droid[bot]@users.noreply.github.com> Co-authored-by: Radosław Kapka <rkapka@wp.pl> Co-authored-by: Manu NALEPA <enalepa@offchainlabs.com>
317 lines
10 KiB
Go
317 lines
10 KiB
Go
package rest
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"os"
|
|
"strings"
|
|
|
|
"github.com/OffchainLabs/prysm/v7/api"
|
|
"github.com/OffchainLabs/prysm/v7/api/apiutil"
|
|
"github.com/OffchainLabs/prysm/v7/config/params"
|
|
"github.com/OffchainLabs/prysm/v7/network/httputil"
|
|
"github.com/OffchainLabs/prysm/v7/runtime/version"
|
|
"github.com/pkg/errors"
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
type reqOption func(*http.Request)
|
|
|
|
// RestHandler defines the interface for making REST API requests.
|
|
type RestHandler interface {
|
|
Get(ctx context.Context, endpoint string, resp any) error
|
|
GetStatusCode(ctx context.Context, endpoint string) (int, error)
|
|
GetSSZ(ctx context.Context, endpoint string) ([]byte, http.Header, error)
|
|
Post(ctx context.Context, endpoint string, headers map[string]string, data *bytes.Buffer, resp any) error
|
|
PostSSZ(ctx context.Context, endpoint string, headers map[string]string, data *bytes.Buffer) ([]byte, http.Header, error)
|
|
HttpClient() *http.Client
|
|
Host() string
|
|
SwitchHost(host string)
|
|
}
|
|
|
|
type restHandler struct {
|
|
client http.Client
|
|
host string
|
|
reqOverrides []reqOption
|
|
}
|
|
|
|
// newRestHandler returns a RestHandler (internal use)
|
|
func newRestHandler(client http.Client, host string) RestHandler {
|
|
return NewRestHandler(client, host)
|
|
}
|
|
|
|
// NewRestHandler returns a RestHandler
|
|
func NewRestHandler(client http.Client, host string) RestHandler {
|
|
rh := &restHandler{
|
|
client: client,
|
|
host: host,
|
|
}
|
|
rh.appendAcceptOverride()
|
|
return rh
|
|
}
|
|
|
|
// appendAcceptOverride enables the Accept header to be customized at runtime via an environment variable.
|
|
// This is specified as an env var because it is a niche option that prysm may use for performance testing or debugging
|
|
// bug which users are unlikely to need. Using an env var keeps the set of user-facing flags cleaner.
|
|
func (c *restHandler) appendAcceptOverride() {
|
|
if accept := os.Getenv(params.EnvNameOverrideAccept); accept != "" {
|
|
c.reqOverrides = append(c.reqOverrides, func(req *http.Request) {
|
|
req.Header.Set("Accept", accept)
|
|
})
|
|
}
|
|
}
|
|
|
|
// HttpClient returns the underlying HTTP client of the handler
|
|
func (c *restHandler) HttpClient() *http.Client {
|
|
return &c.client
|
|
}
|
|
|
|
// Host returns the underlying HTTP host
|
|
func (c *restHandler) Host() string {
|
|
return c.host
|
|
}
|
|
|
|
// Get sends a GET request and decodes the response body as a JSON object into the passed in object.
|
|
// If an HTTP error is returned, the body is decoded as a DefaultJsonError JSON object and returned as the first return value.
|
|
func (c *restHandler) Get(ctx context.Context, endpoint string, resp any) error {
|
|
url := c.host + endpoint
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
|
if err != nil {
|
|
return errors.Wrapf(err, "failed to create request for endpoint %s", url)
|
|
}
|
|
req.Header.Set("User-Agent", version.BuildData())
|
|
httpResp, err := c.client.Do(req)
|
|
if err != nil {
|
|
return errors.Wrapf(err, "failed to perform request for endpoint %s", url)
|
|
}
|
|
defer func() {
|
|
if err := httpResp.Body.Close(); err != nil {
|
|
return
|
|
}
|
|
}()
|
|
|
|
return decodeResp(httpResp, resp)
|
|
}
|
|
|
|
// GetStatusCode sends a GET request and returns only the HTTP status code.
|
|
// This is useful for endpoints like /eth/v1/node/health that communicate status via HTTP codes
|
|
// (200 = ready, 206 = syncing, 503 = unavailable) rather than response bodies.
|
|
func (c *restHandler) GetStatusCode(ctx context.Context, endpoint string) (int, error) {
|
|
url := c.host + endpoint
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
|
if err != nil {
|
|
return 0, errors.Wrapf(err, "failed to create request for endpoint %s", url)
|
|
}
|
|
req.Header.Set("User-Agent", version.BuildData())
|
|
httpResp, err := c.client.Do(req)
|
|
if err != nil {
|
|
return 0, errors.Wrapf(err, "failed to perform request for endpoint %s", url)
|
|
}
|
|
defer func() {
|
|
if err := httpResp.Body.Close(); err != nil {
|
|
return
|
|
}
|
|
}()
|
|
return httpResp.StatusCode, nil
|
|
}
|
|
|
|
func (c *restHandler) GetSSZ(ctx context.Context, endpoint string) ([]byte, http.Header, error) {
|
|
url := c.host + endpoint
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
|
if err != nil {
|
|
return nil, nil, errors.Wrapf(err, "failed to create request for endpoint %s", url)
|
|
}
|
|
|
|
primaryAcceptType := fmt.Sprintf("%s;q=%s", api.OctetStreamMediaType, "0.95")
|
|
secondaryAcceptType := fmt.Sprintf("%s;q=%s", api.JsonMediaType, "0.9")
|
|
acceptHeaderString := fmt.Sprintf("%s,%s", primaryAcceptType, secondaryAcceptType)
|
|
req.Header.Set("Accept", acceptHeaderString)
|
|
|
|
for _, o := range c.reqOverrides {
|
|
o(req)
|
|
}
|
|
|
|
req.Header.Set("User-Agent", version.BuildData())
|
|
httpResp, err := c.client.Do(req)
|
|
if err != nil {
|
|
return nil, nil, errors.Wrapf(err, "failed to perform request for endpoint %s", url)
|
|
}
|
|
defer func() {
|
|
if err := httpResp.Body.Close(); err != nil {
|
|
return
|
|
}
|
|
}()
|
|
accept := req.Header.Get("Accept")
|
|
contentType := httpResp.Header.Get("Content-Type")
|
|
body, err := io.ReadAll(httpResp.Body)
|
|
if err != nil {
|
|
return nil, nil, errors.Wrapf(err, "failed to read response body for %s", httpResp.Request.URL)
|
|
}
|
|
|
|
if !apiutil.PrimaryAcceptMatches(accept, contentType) {
|
|
log.WithFields(logrus.Fields{
|
|
"Accept": accept,
|
|
"Content-Type": contentType,
|
|
}).Debug("Server responded with non primary accept type")
|
|
}
|
|
|
|
// non-2XX codes are a failure
|
|
if !strings.HasPrefix(httpResp.Status, "2") {
|
|
decoder := json.NewDecoder(bytes.NewBuffer(body))
|
|
errorJson := &httputil.DefaultJsonError{}
|
|
if err = decoder.Decode(errorJson); err != nil {
|
|
return nil, nil, fmt.Errorf("HTTP request for %s unsuccessful (%d: %s)", httpResp.Request.URL, httpResp.StatusCode, string(body))
|
|
}
|
|
return nil, nil, errorJson
|
|
}
|
|
|
|
return body, httpResp.Header, nil
|
|
}
|
|
|
|
// Post sends a POST request and decodes the response body as a JSON object into the passed in object.
|
|
// If an HTTP error is returned, the body is decoded as a DefaultJsonError JSON object and returned as the first return value.
|
|
func (c *restHandler) Post(
|
|
ctx context.Context,
|
|
apiEndpoint string,
|
|
headers map[string]string,
|
|
data *bytes.Buffer,
|
|
resp any,
|
|
) error {
|
|
if data == nil {
|
|
return errors.New("data is nil")
|
|
}
|
|
|
|
url := c.host + apiEndpoint
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, data)
|
|
if err != nil {
|
|
return errors.Wrapf(err, "failed to create request for endpoint %s", url)
|
|
}
|
|
|
|
for headerKey, headerValue := range headers {
|
|
req.Header.Set(headerKey, headerValue)
|
|
}
|
|
req.Header.Set("Content-Type", api.JsonMediaType)
|
|
req.Header.Set("User-Agent", version.BuildData())
|
|
httpResp, err := c.client.Do(req)
|
|
if err != nil {
|
|
return errors.Wrapf(err, "failed to perform request for endpoint %s", url)
|
|
}
|
|
defer func() {
|
|
if err = httpResp.Body.Close(); err != nil {
|
|
return
|
|
}
|
|
}()
|
|
|
|
return decodeResp(httpResp, resp)
|
|
}
|
|
|
|
// PostSSZ sends a POST request and prefers an SSZ (application/octet-stream) response body.
|
|
func (c *restHandler) PostSSZ(
|
|
ctx context.Context,
|
|
apiEndpoint string,
|
|
headers map[string]string,
|
|
data *bytes.Buffer,
|
|
) ([]byte, http.Header, error) {
|
|
if data == nil {
|
|
return nil, nil, errors.New("data is nil")
|
|
}
|
|
url := c.host + apiEndpoint
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, data)
|
|
if err != nil {
|
|
return nil, nil, errors.Wrapf(err, "failed to create request for endpoint %s", url)
|
|
}
|
|
|
|
// Accept header: prefer octet-stream (SSZ), fall back to JSON
|
|
primaryAcceptType := fmt.Sprintf("%s;q=%s", api.OctetStreamMediaType, "0.95")
|
|
secondaryAcceptType := fmt.Sprintf("%s;q=%s", api.JsonMediaType, "0.9")
|
|
acceptHeaderString := fmt.Sprintf("%s,%s", primaryAcceptType, secondaryAcceptType)
|
|
req.Header.Set("Accept", acceptHeaderString)
|
|
|
|
// User-supplied headers
|
|
for headerKey, headerValue := range headers {
|
|
req.Header.Set(headerKey, headerValue)
|
|
}
|
|
|
|
for _, o := range c.reqOverrides {
|
|
o(req)
|
|
}
|
|
req.Header.Set("Content-Type", api.OctetStreamMediaType)
|
|
req.Header.Set("User-Agent", version.BuildData())
|
|
httpResp, err := c.client.Do(req)
|
|
if err != nil {
|
|
return nil, nil, errors.Wrapf(err, "failed to perform request for endpoint %s", url)
|
|
}
|
|
defer func() {
|
|
if err := httpResp.Body.Close(); err != nil {
|
|
return
|
|
}
|
|
}()
|
|
|
|
accept := req.Header.Get("Accept")
|
|
contentType := httpResp.Header.Get("Content-Type")
|
|
body, err := io.ReadAll(httpResp.Body)
|
|
if err != nil {
|
|
return nil, nil, errors.Wrapf(err, "failed to read response body for %s", httpResp.Request.URL)
|
|
}
|
|
|
|
if !apiutil.PrimaryAcceptMatches(accept, contentType) {
|
|
log.WithFields(logrus.Fields{
|
|
"Accept": accept,
|
|
"Content-Type": contentType,
|
|
}).Debug("Server responded with non primary accept type")
|
|
}
|
|
|
|
// non-2XX codes are a failure
|
|
if !strings.HasPrefix(httpResp.Status, "2") {
|
|
decoder := json.NewDecoder(bytes.NewBuffer(body))
|
|
errorJson := &httputil.DefaultJsonError{}
|
|
if err = decoder.Decode(errorJson); err != nil {
|
|
return nil, nil, fmt.Errorf("HTTP request for %s unsuccessful (%d: %s)", httpResp.Request.URL, httpResp.StatusCode, string(body))
|
|
}
|
|
return nil, nil, errorJson
|
|
}
|
|
|
|
return body, httpResp.Header, nil
|
|
}
|
|
|
|
func decodeResp(httpResp *http.Response, resp any) error {
|
|
body, err := io.ReadAll(httpResp.Body)
|
|
if err != nil {
|
|
return errors.Wrapf(err, "failed to read response body for %s", httpResp.Request.URL)
|
|
}
|
|
|
|
if !strings.Contains(httpResp.Header.Get("Content-Type"), api.JsonMediaType) {
|
|
// 2XX codes are a success
|
|
if strings.HasPrefix(httpResp.Status, "2") {
|
|
return nil
|
|
}
|
|
return &httputil.DefaultJsonError{Code: httpResp.StatusCode, Message: string(body)}
|
|
}
|
|
|
|
decoder := json.NewDecoder(bytes.NewBuffer(body))
|
|
// non-2XX codes are a failure
|
|
if !strings.HasPrefix(httpResp.Status, "2") {
|
|
errorJson := &httputil.DefaultJsonError{}
|
|
if err = decoder.Decode(errorJson); err != nil {
|
|
return errors.Wrapf(err, "failed to decode response body into error json for %s", httpResp.Request.URL)
|
|
}
|
|
return errorJson
|
|
}
|
|
// resp is nil for requests that do not return anything.
|
|
if resp != nil {
|
|
if err = decoder.Decode(resp); err != nil {
|
|
return errors.Wrapf(err, "failed to decode response body into json for %s", httpResp.Request.URL)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *restHandler) SwitchHost(host string) {
|
|
c.host = host
|
|
}
|