Add sync details to error messages (#9750)

This commit is contained in:
Radosław Kapka
2021-10-07 20:50:03 +02:00
committed by GitHub
parent e7085897ad
commit b5f0bd88b0
13 changed files with 230 additions and 66 deletions

View File

@@ -125,12 +125,12 @@ func (m *ApiProxyMiddleware) handleApiPath(gatewayRouter *mux.Router, path strin
var respJson []byte
if !GrpcResponseIsEmpty(grpcRespBody) {
if errJson := DeserializeGrpcResponseBodyIntoErrorJson(endpoint.Err, grpcRespBody); errJson != nil {
respHasError, errJson := HandleGrpcResponseError(endpoint.Err, grpcResp, grpcRespBody, w)
if errJson != nil {
WriteError(w, errJson, nil)
return
}
if endpoint.Err.Msg() != "" {
HandleGrpcResponseError(endpoint.Err, grpcResp, w)
if respHasError {
return
}

View File

@@ -97,26 +97,25 @@ func ReadGrpcResponseBody(r io.Reader) ([]byte, ErrorJson) {
return body, nil
}
// DeserializeGrpcResponseBodyIntoErrorJson deserializes the body from the grpc-gateway's response into an error struct.
// The struct can be later examined to check if the request resulted in an error.
func DeserializeGrpcResponseBodyIntoErrorJson(errJson ErrorJson, body []byte) ErrorJson {
if err := json.Unmarshal(body, errJson); err != nil {
return InternalServerErrorWithMessage(err, "could not unmarshal error")
}
return nil
}
// HandleGrpcResponseError acts on an error that resulted from a grpc-gateway's response.
func HandleGrpcResponseError(errJson ErrorJson, resp *http.Response, w http.ResponseWriter) {
// Something went wrong, but the request completed, meaning we can write headers and the error message.
for h, vs := range resp.Header {
for _, v := range vs {
w.Header().Set(h, v)
}
func HandleGrpcResponseError(errJson ErrorJson, resp *http.Response, respBody []byte, w http.ResponseWriter) (bool, ErrorJson) {
responseHasError := false
if err := json.Unmarshal(respBody, errJson); err != nil {
return false, InternalServerErrorWithMessage(err, "could not unmarshal error")
}
// Set code to HTTP code because unmarshalled body contained gRPC code.
errJson.SetCode(resp.StatusCode)
WriteError(w, errJson, resp.Header)
if errJson.Msg() != "" {
responseHasError = true
// Something went wrong, but the request completed, meaning we can write headers and the error message.
for h, vs := range resp.Header {
for _, v := range vs {
w.Header().Set(h, v)
}
}
// Set code to HTTP code because unmarshalled body contained gRPC code.
errJson.SetCode(resp.StatusCode)
WriteError(w, errJson, resp.Header)
}
return responseHasError, nil
}
// GrpcResponseIsEmpty determines whether the grpc-gateway's response body contains no data.
@@ -201,9 +200,11 @@ func WriteMiddlewareResponseHeadersAndBody(grpcResp *http.Response, responseJson
// WriteError writes the error by manipulating headers and the body of the final response.
func WriteError(w http.ResponseWriter, errJson ErrorJson, responseHeader http.Header) {
// Include custom error in the error JSON.
hasCustomError := false
if responseHeader != nil {
customError, ok := responseHeader["Grpc-Metadata-"+grpc.CustomErrorMetadataKey]
if ok {
hasCustomError = true
// Assume header has only one value and read the 0 index.
if err := json.Unmarshal([]byte(customError[0]), errJson); err != nil {
log.WithError(err).Error("Could not unmarshal custom error message")
@@ -212,10 +213,29 @@ func WriteError(w http.ResponseWriter, errJson ErrorJson, responseHeader http.He
}
}
j, err := json.Marshal(errJson)
if err != nil {
log.WithError(err).Error("Could not marshal error message")
return
var j []byte
if hasCustomError {
var err error
j, err = json.Marshal(errJson)
if err != nil {
log.WithError(err).Error("Could not marshal error message")
return
}
} else {
var err error
// We marshal the response body into a DefaultErrorJson if the custom error is not present.
// This is because the ErrorJson argument is the endpoint's error definition, which may contain custom fields.
// In such a scenario marhaling the endpoint's error would populate the resulting JSON
// with these fields even if they are not present in the gRPC header.
d := &DefaultErrorJson{
Message: errJson.Msg(),
Code: errJson.StatusCode(),
}
j, err = json.Marshal(d)
if err != nil {
log.WithError(err).Error("Could not marshal error message")
return
}
}
w.Header().Set("Content-Length", strconv.Itoa(len(j)))

View File

@@ -63,6 +63,11 @@ func (e *testErrorJson) SetCode(code int) {
e.Code = code
}
// SetMsg sets the error's underlying message.
func (e *testErrorJson) SetMsg(msg string) {
e.Message = msg
}
func TestDeserializeRequestBodyIntoContainer(t *testing.T) {
t.Run("ok", func(t *testing.T) {
var bodyJson bytes.Buffer
@@ -156,29 +161,6 @@ func TestReadGrpcResponseBody(t *testing.T) {
assert.Equal(t, "foo", string(body))
}
func TestDeserializeGrpcResponseBodyIntoErrorJson(t *testing.T) {
t.Run("ok", func(t *testing.T) {
e := &testErrorJson{
Message: "foo",
Code: 500,
}
body, err := json.Marshal(e)
require.NoError(t, err)
eToDeserialize := &testErrorJson{}
errJson := DeserializeGrpcResponseBodyIntoErrorJson(eToDeserialize, body)
require.Equal(t, true, errJson == nil)
assert.Equal(t, "foo", eToDeserialize.Msg())
assert.Equal(t, 500, eToDeserialize.StatusCode())
})
t.Run("error", func(t *testing.T) {
errJson := DeserializeGrpcResponseBodyIntoErrorJson(nil, nil)
require.NotNil(t, errJson)
assert.Equal(t, true, strings.Contains(errJson.Msg(), "could not unmarshal error"))
})
}
func TestHandleGrpcResponseError(t *testing.T) {
response := &http.Response{
StatusCode: 400,
@@ -190,10 +172,14 @@ func TestHandleGrpcResponseError(t *testing.T) {
writer := httptest.NewRecorder()
errJson := &testErrorJson{
Message: "foo",
Code: 500,
Code: 400,
}
b, err := json.Marshal(errJson)
require.NoError(t, err)
HandleGrpcResponseError(errJson, response, writer)
hasError, e := HandleGrpcResponseError(errJson, response, b, writer)
require.Equal(t, true, e == nil)
assert.Equal(t, true, hasError)
v, ok := writer.Header()["Foo"]
require.Equal(t, true, ok, "header not found")
require.Equal(t, 1, len(v), "wrong number of header values")

View File

@@ -15,6 +15,7 @@ type ErrorJson interface {
StatusCode() int
SetCode(code int)
Msg() string
SetMsg(msg string)
}
// DefaultErrorJson is a JSON representation of a simple error value, containing only a message and an error code.
@@ -54,3 +55,8 @@ func (e *DefaultErrorJson) Msg() string {
func (e *DefaultErrorJson) SetCode(code int) {
e.Code = code
}
// SetMsg sets the error's underlying message.
func (e *DefaultErrorJson) SetMsg(msg string) {
e.Message = msg
}

View File

@@ -84,13 +84,13 @@ func handleGetSSZ(
apimiddleware.WriteError(w, errJson, nil)
return true
}
if errJson := apimiddleware.DeserializeGrpcResponseBodyIntoErrorJson(endpoint.Err, grpcResponseBody); errJson != nil {
respHasError, errJson := apimiddleware.HandleGrpcResponseError(endpoint.Err, grpcResponse, grpcResponseBody, w)
if errJson != nil {
apimiddleware.WriteError(w, errJson, nil)
return true
return
}
if endpoint.Err.Msg() != "" {
apimiddleware.HandleGrpcResponseError(endpoint.Err, grpcResponse, w)
return true
if respHasError {
return
}
if errJson := apimiddleware.DeserializeGrpcResponseBodyIntoContainer(grpcResponseBody, config.responseJson); errJson != nil {
apimiddleware.WriteError(w, errJson, nil)

View File

@@ -180,16 +180,19 @@ func (f *BeaconEndpointFactory) Create(path string) (*apimiddleware.Endpoint, er
endpoint.PostRequest = &dutiesRequestJson{}
endpoint.PostResponse = &attesterDutiesResponseJson{}
endpoint.RequestURLLiterals = []string{"epoch"}
endpoint.Err = &nodeSyncDetailsErrorJson{}
endpoint.Hooks = apimiddleware.HookCollection{
OnPreDeserializeRequestBodyIntoContainer: wrapValidatorIndicesArray,
}
case "/eth/v1/validator/duties/proposer/{epoch}":
endpoint.GetResponse = &proposerDutiesResponseJson{}
endpoint.RequestURLLiterals = []string{"epoch"}
endpoint.Err = &nodeSyncDetailsErrorJson{}
case "/eth/v1/validator/duties/sync/{epoch}":
endpoint.PostRequest = &dutiesRequestJson{}
endpoint.PostResponse = &syncCommitteeDutiesResponseJson{}
endpoint.RequestURLLiterals = []string{"epoch"}
endpoint.Err = &nodeSyncDetailsErrorJson{}
endpoint.Hooks = apimiddleware.HookCollection{
OnPreDeserializeRequestBodyIntoContainer: wrapValidatorIndicesArray,
}
@@ -212,11 +215,13 @@ func (f *BeaconEndpointFactory) Create(path string) (*apimiddleware.Endpoint, er
endpoint.RequestQueryParams = []apimiddleware.QueryParam{{Name: "attestation_data_root", Hex: true}, {Name: "slot"}}
case "/eth/v1/validator/beacon_committee_subscriptions":
endpoint.PostRequest = &submitBeaconCommitteeSubscriptionsRequestJson{}
endpoint.Err = &nodeSyncDetailsErrorJson{}
endpoint.Hooks = apimiddleware.HookCollection{
OnPreDeserializeRequestBodyIntoContainer: wrapBeaconCommitteeSubscriptionsArray,
}
case "/eth/v1/validator/sync_committee_subscriptions":
endpoint.PostRequest = &submitSyncCommitteeSubscriptionRequestJson{}
endpoint.Err = &nodeSyncDetailsErrorJson{}
endpoint.Hooks = apimiddleware.HookCollection{
OnPreDeserializeRequestBodyIntoContainer: wrapSyncCommitteeSubscriptionsArray,
}

View File

@@ -751,7 +751,18 @@ type singleIndexedVerificationFailureJson struct {
Message string `json:"message"`
}
type nodeSyncDetailsErrorJson struct {
apimiddleware.DefaultErrorJson
SyncDetails syncDetails `json:"sync_details"`
}
type eventErrorJson struct {
StatusCode int `json:"status_code"`
Message string `json:"message"`
}
type syncDetails struct {
HeadSlot string `json:"head_slot"`
SyncDistance string `json:"sync_distance"`
IsSyncing bool `json:"is_syncing"`
}

View File

@@ -4,13 +4,17 @@ go_library(
name = "go_default_library",
srcs = [
"error_handling.go",
"sync.go",
"validator_status.go",
],
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/rpc/eth/helpers",
visibility = ["//beacon-chain/rpc/eth:__subpackages__"],
deps = [
"//api/grpc:go_default_library",
"//beacon-chain/blockchain:go_default_library",
"//beacon-chain/rpc/statefetcher:go_default_library",
"//beacon-chain/state:go_default_library",
"//beacon-chain/sync:go_default_library",
"//config/params:go_default_library",
"//proto/eth/v1:go_default_library",
"@com_github_pkg_errors//:go_default_library",
@@ -22,15 +26,24 @@ go_library(
go_test(
name = "go_default_test",
srcs = ["validator_status_test.go"],
srcs = [
"sync_test.go",
"validator_status_test.go",
],
embed = [":go_default_library"],
deps = [
"//api/grpc:go_default_library",
"//beacon-chain/blockchain/testing:go_default_library",
"//beacon-chain/state/v1:go_default_library",
"//beacon-chain/sync/initial-sync/testing:go_default_library",
"//config/params:go_default_library",
"//proto/eth/v1:go_default_library",
"//proto/migration:go_default_library",
"//testing/assert:go_default_library",
"//testing/require:go_default_library",
"//testing/util:go_default_library",
"@com_github_grpc_ecosystem_grpc_gateway_v2//runtime:go_default_library",
"@com_github_prysmaticlabs_eth2_types//:go_default_library",
"@org_golang_google_grpc//:go_default_library",
],
)

View File

@@ -27,3 +27,15 @@ type SingleIndexedVerificationFailure struct {
Index int `json:"index"`
Message string `json:"message"`
}
// SyncDetails contain details about sync status.
type SyncDetails struct {
HeadSlot string `json:"head_slot"`
SyncDistance string `json:"sync_distance"`
IsSyncing bool `json:"is_syncing"`
}
// SyncDetailsContainer is a wrapper for SyncDetails.
type SyncDetailsContainer struct {
SyncDetails *SyncDetails `json:"sync_details"`
}

View File

@@ -0,0 +1,37 @@
package helpers
import (
"context"
"strconv"
"github.com/prysmaticlabs/prysm/api/grpc"
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/beacon-chain/sync"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// ValidateSync checks whether the node is currently syncing and returns an error if it is.
// It also appends syncing info to gRPC headers.
func ValidateSync(ctx context.Context, syncChecker sync.Checker, headFetcher blockchain.HeadFetcher, timeFetcher blockchain.TimeFetcher) error {
if !syncChecker.Syncing() {
return nil
}
headSlot := headFetcher.HeadSlot()
syncDetailsContainer := &SyncDetailsContainer{
SyncDetails: &SyncDetails{
HeadSlot: strconv.FormatUint(uint64(headSlot), 10),
SyncDistance: strconv.FormatUint(uint64(timeFetcher.CurrentSlot()-headSlot), 10),
IsSyncing: true,
},
}
err := grpc.AppendCustomErrorHeader(ctx, syncDetailsContainer)
if err != nil {
return status.Errorf(
codes.InvalidArgument,
"Syncing to latest head, not ready to respond. Could not prepare sync details: %v",
err,
)
}
return status.Error(codes.Unavailable, "Syncing to latest head, not ready to respond")
}

View File

@@ -0,0 +1,53 @@
package helpers
import (
"context"
"strings"
"testing"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
types "github.com/prysmaticlabs/eth2-types"
grpcutil "github.com/prysmaticlabs/prysm/api/grpc"
chainmock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing"
syncmock "github.com/prysmaticlabs/prysm/beacon-chain/sync/initial-sync/testing"
"github.com/prysmaticlabs/prysm/testing/assert"
"github.com/prysmaticlabs/prysm/testing/require"
"github.com/prysmaticlabs/prysm/testing/util"
"google.golang.org/grpc"
)
func TestValidateSync(t *testing.T) {
ctx := grpc.NewContextWithServerTransportStream(context.Background(), &runtime.ServerTransportStream{})
t.Run("syncing", func(t *testing.T) {
syncChecker := &syncmock.Sync{
IsSyncing: true,
}
headSlot := types.Slot(100)
st, err := util.NewBeaconState()
require.NoError(t, err)
require.NoError(t, st.SetSlot(50))
chainService := &chainmock.ChainService{
Slot: &headSlot,
State: st,
}
err = ValidateSync(ctx, syncChecker, chainService, chainService)
require.NotNil(t, err)
sts, ok := grpc.ServerTransportStreamFromContext(ctx).(*runtime.ServerTransportStream)
require.Equal(t, true, ok, "type assertion failed")
md := sts.Header()
v, ok := md[strings.ToLower(grpcutil.CustomErrorMetadataKey)]
require.Equal(t, true, ok, "could not retrieve custom error metadata value")
assert.DeepEqual(
t,
[]string{"{\"sync_details\":{\"head_slot\":\"50\",\"sync_distance\":\"50\",\"is_syncing\":true}}"},
v,
)
})
t.Run("not syncing", func(t *testing.T) {
syncChecker := &syncmock.Sync{
IsSyncing: false,
}
err := ValidateSync(ctx, syncChecker, nil, nil)
require.NoError(t, err)
})
}

View File

@@ -38,8 +38,9 @@ func (vs *Server) GetAttesterDuties(ctx context.Context, req *ethpbv1.AttesterDu
ctx, span := trace.StartSpan(ctx, "validator.GetAttesterDuties")
defer span.End()
if vs.SyncChecker.Syncing() {
return nil, status.Error(codes.Unavailable, "Syncing to latest head, not ready to respond")
if err := rpchelpers.ValidateSync(ctx, vs.SyncChecker, vs.HeadFetcher, vs.TimeFetcher); err != nil {
// We simply return the error because it's already a gRPC error.
return nil, err
}
cs := vs.TimeFetcher.CurrentSlot()
@@ -112,8 +113,9 @@ func (vs *Server) GetProposerDuties(ctx context.Context, req *ethpbv1.ProposerDu
ctx, span := trace.StartSpan(ctx, "validator.GetProposerDuties")
defer span.End()
if vs.SyncChecker.Syncing() {
return nil, status.Error(codes.Unavailable, "Syncing to latest head, not ready to respond")
if err := rpchelpers.ValidateSync(ctx, vs.SyncChecker, vs.HeadFetcher, vs.TimeFetcher); err != nil {
// We simply return the error because it's already a gRPC error.
return nil, err
}
cs := vs.TimeFetcher.CurrentSlot()
@@ -183,8 +185,9 @@ func (vs *Server) GetSyncCommitteeDuties(ctx context.Context, req *ethpbv2.SyncC
ctx, span := trace.StartSpan(ctx, "validator.GetSyncCommitteeDuties")
defer span.End()
if vs.SyncChecker.Syncing() {
return nil, status.Error(codes.Unavailable, "Syncing to latest head, not ready to respond")
if err := rpchelpers.ValidateSync(ctx, vs.SyncChecker, vs.HeadFetcher, vs.TimeFetcher); err != nil {
// We simply return the error because it's already a gRPC error.
return nil, err
}
currentEpoch := slots.ToEpoch(vs.TimeFetcher.CurrentSlot())
@@ -400,8 +403,9 @@ func (vs *Server) SubmitBeaconCommitteeSubscription(ctx context.Context, req *et
ctx, span := trace.StartSpan(ctx, "validator.SubmitBeaconCommitteeSubscription")
defer span.End()
if vs.SyncChecker.Syncing() {
return nil, status.Error(codes.Unavailable, "Syncing to latest head, not ready to respond")
if err := rpchelpers.ValidateSync(ctx, vs.SyncChecker, vs.HeadFetcher, vs.TimeFetcher); err != nil {
// We simply return the error because it's already a gRPC error.
return nil, err
}
if len(req.Data) == 0 {
@@ -476,9 +480,11 @@ func (vs *Server) SubmitSyncCommitteeSubscription(ctx context.Context, req *ethp
ctx, span := trace.StartSpan(ctx, "validator.SubmitSyncCommitteeSubscription")
defer span.End()
if vs.SyncChecker.Syncing() {
return nil, status.Error(codes.Unavailable, "Syncing to latest head, not ready to respond")
if err := rpchelpers.ValidateSync(ctx, vs.SyncChecker, vs.HeadFetcher, vs.TimeFetcher); err != nil {
// We simply return the error because it's already a gRPC error.
return nil, err
}
if len(req.Data) == 0 {
return nil, status.Error(codes.InvalidArgument, "No subscriptions provided")
}

View File

@@ -195,8 +195,11 @@ func TestGetAttesterDuties(t *testing.T) {
}
func TestGetAttesterDuties_SyncNotReady(t *testing.T) {
chainService := &mockChain.ChainService{}
vs := &Server{
SyncChecker: &mockSync.Sync{IsSyncing: true},
HeadFetcher: chainService,
TimeFetcher: chainService,
}
_, err := vs.GetAttesterDuties(context.Background(), &ethpbv1.AttesterDutiesRequest{})
assert.ErrorContains(t, "Syncing to latest head, not ready to respond", err)
@@ -316,8 +319,11 @@ func TestGetProposerDuties(t *testing.T) {
}
func TestGetProposerDuties_SyncNotReady(t *testing.T) {
chainService := &mockChain.ChainService{}
vs := &Server{
SyncChecker: &mockSync.Sync{IsSyncing: true},
HeadFetcher: chainService,
TimeFetcher: chainService,
}
_, err := vs.GetProposerDuties(context.Background(), &ethpbv1.ProposerDutiesRequest{})
assert.ErrorContains(t, "Syncing to latest head, not ready to respond", err)
@@ -444,8 +450,11 @@ func TestGetSyncCommitteeDuties(t *testing.T) {
}
func TestGetSyncCommitteeDuties_SyncNotReady(t *testing.T) {
chainService := &mockChain.ChainService{}
vs := &Server{
SyncChecker: &mockSync.Sync{IsSyncing: true},
HeadFetcher: chainService,
TimeFetcher: chainService,
}
_, err := vs.GetSyncCommitteeDuties(context.Background(), &ethpbv2.SyncCommitteeDutiesRequest{})
assert.ErrorContains(t, "Syncing to latest head, not ready to respond", err)
@@ -1152,8 +1161,11 @@ func TestSubmitBeaconCommitteeSubscription(t *testing.T) {
}
func TestSubmitBeaconCommitteeSubscription_SyncNotReady(t *testing.T) {
chainService := &mockChain.ChainService{}
vs := &Server{
SyncChecker: &mockSync.Sync{IsSyncing: true},
HeadFetcher: chainService,
TimeFetcher: chainService,
}
_, err := vs.SubmitBeaconCommitteeSubscription(context.Background(), &ethpbv1.SubmitBeaconCommitteeSubscriptionsRequest{})
assert.ErrorContains(t, "Syncing to latest head, not ready to respond", err)
@@ -1291,8 +1303,11 @@ func TestSubmitSyncCommitteeSubscription(t *testing.T) {
}
func TestSubmitSyncCommitteeSubscription_SyncNotReady(t *testing.T) {
chainService := &mockChain.ChainService{}
vs := &Server{
SyncChecker: &mockSync.Sync{IsSyncing: true},
HeadFetcher: chainService,
TimeFetcher: chainService,
}
_, err := vs.SubmitSyncCommitteeSubscription(context.Background(), &ethpbv2.SubmitSyncCommitteeSubscriptionsRequest{})
assert.ErrorContains(t, "Syncing to latest head, not ready to respond", err)