HTTP validator API: health endpoints (#13149)

* updating health endpoints

* updating tests

* updating tests

* moving where the header is written and adding allow origin header

* removing header

* Update validator/rpc/handlers_health.go

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* Update validator/rpc/handlers_health.go

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* Update validator/rpc/handlers_health.go

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* radek's comments

* Update handlers_health.go

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* adding the correct errors to handle error

---------

Co-authored-by: Radosław Kapka <rkapka@wp.pl>
This commit is contained in:
james-prysm
2023-11-02 10:51:21 -05:00
committed by GitHub
parent 57eda1de63
commit c0fb16a96f
15 changed files with 801 additions and 1611 deletions

View File

@@ -796,7 +796,6 @@ func (c *ValidatorClient) registerRPCGatewayService(router *mux.Router) error {
validatorpb.RegisterAuthHandler,
validatorpb.RegisterWalletHandler,
pb.RegisterHealthHandler,
validatorpb.RegisterHealthHandler,
validatorpb.RegisterAccountsHandler,
validatorpb.RegisterBeaconHandler,
validatorpb.RegisterSlashingProtectionHandler,

View File

@@ -6,8 +6,8 @@ go_library(
"accounts.go",
"auth_token.go",
"beacon.go",
"handlers_health.go",
"handlers_keymanager.go",
"health.go",
"intercepter.go",
"log.go",
"server.go",
@@ -91,8 +91,8 @@ go_test(
"accounts_test.go",
"auth_token_test.go",
"beacon_test.go",
"handlers_health_test.go",
"handlers_keymanager_test.go",
"health_test.go",
"intercepter_test.go",
"server_test.go",
"slashing_test.go",
@@ -112,6 +112,7 @@ go_test(
"//crypto/rand:go_default_library",
"//encoding/bytesutil:go_default_library",
"//io/file:go_default_library",
"//io/logs/mock:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//proto/prysm/v1alpha1/validator-client:go_default_library",
"//testing/assert:go_default_library",

View File

@@ -0,0 +1,163 @@
package rpc
import (
"encoding/json"
"fmt"
"net/http"
http2 "github.com/prysmaticlabs/prysm/v4/network/http"
pb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v4/runtime/version"
"go.opencensus.io/trace"
"google.golang.org/protobuf/types/known/emptypb"
)
// GetVersion returns the beacon node and validator client versions
func (s *Server) GetVersion(w http.ResponseWriter, r *http.Request) {
ctx, span := trace.StartSpan(r.Context(), "validator.web.health.GetVersion")
defer span.End()
beacon, err := s.beaconNodeClient.GetVersion(ctx, &emptypb.Empty{})
if err != nil {
http2.HandleError(w, err.Error(), http.StatusInternalServerError)
return
}
http2.WriteJson(w, struct {
Beacon string `json:"beacon"`
Validator string `json:"validator"`
}{
Beacon: beacon.Version,
Validator: version.Version(),
})
}
// StreamBeaconLogs from the beacon node via server-side events.
func (s *Server) StreamBeaconLogs(w http.ResponseWriter, r *http.Request) {
// Wrap service context with a cancel in order to propagate the exiting of
// this method properly to the beacon node server.
ctx, span := trace.StartSpan(r.Context(), "validator.web.health.StreamBeaconLogs")
defer span.End()
// Set up SSE response headers
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
// Flush helper function to ensure data is sent to client
flusher, ok := w.(http.Flusher)
if !ok {
http2.HandleError(w, "Streaming unsupported!", http.StatusInternalServerError)
return
}
// TODO: StreamBeaconLogs grpc will need to be replaced in the future
client, err := s.beaconNodeHealthClient.StreamBeaconLogs(ctx, &emptypb.Empty{})
if err != nil {
http2.HandleError(w, err.Error(), http.StatusInternalServerError)
return
}
for {
select {
case <-s.ctx.Done():
return
case <-ctx.Done():
return
case <-client.Context().Done():
return
default:
logResp, err := client.Recv()
if err != nil {
http2.HandleError(w, "could not receive beacon logs from stream: "+err.Error(), http.StatusInternalServerError)
return
}
jsonResp, err := json.Marshal(logResp)
if err != nil {
http2.HandleError(w, "could not encode log response into JSON: "+err.Error(), http.StatusInternalServerError)
return
}
// Send the response as an SSE event
// Assuming resp has a String() method for simplicity
_, err = fmt.Fprintf(w, "%s\n", jsonResp)
if err != nil {
http2.HandleError(w, err.Error(), http.StatusInternalServerError)
return
}
// Flush the data to the client immediately
flusher.Flush()
}
}
}
// StreamValidatorLogs from the validator client via server-side events.
func (s *Server) StreamValidatorLogs(w http.ResponseWriter, r *http.Request) {
ctx, span := trace.StartSpan(r.Context(), "validator.web.health.StreamValidatorLogs")
defer span.End()
// Ensure that the writer supports flushing.
flusher, ok := w.(http.Flusher)
if !ok {
http2.HandleError(w, "Streaming unsupported!", http.StatusInternalServerError)
return
}
ch := make(chan []byte, s.streamLogsBufferSize)
sub := s.logsStreamer.LogsFeed().Subscribe(ch)
defer func() {
sub.Unsubscribe()
close(ch)
}()
// Set up SSE response headers
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
recentLogs := s.logsStreamer.GetLastFewLogs()
logStrings := make([]string, len(recentLogs))
for i, l := range recentLogs {
logStrings[i] = string(l)
}
ls := &pb.LogsResponse{
Logs: logStrings,
}
jsonLogs, err := json.Marshal(ls)
if err != nil {
http2.HandleError(w, "Failed to marshal logs: "+err.Error(), http.StatusInternalServerError)
return
}
_, err = fmt.Fprintf(w, "%s\n", jsonLogs)
if err != nil {
http2.HandleError(w, "Error sending data: "+err.Error(), http.StatusInternalServerError)
return
}
flusher.Flush()
for {
select {
case log := <-ch:
// Set up SSE response headers
ls = &pb.LogsResponse{
Logs: []string{string(log)},
}
jsonLogs, err = json.Marshal(ls)
if err != nil {
http2.HandleError(w, "Failed to marshal logs: "+err.Error(), http.StatusInternalServerError)
return
}
_, err = fmt.Fprintf(w, "%s\n", jsonLogs)
if err != nil {
http2.HandleError(w, "Error sending data: "+err.Error(), http.StatusInternalServerError)
return
}
flusher.Flush()
case <-s.ctx.Done():
return
case err := <-sub.Err():
http2.HandleError(w, "Subscriber error: "+err.Error(), http.StatusInternalServerError)
return
case <-ctx.Done():
return
}
}
}

View File

@@ -0,0 +1,191 @@
package rpc
import (
"bytes"
"context"
"io"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/golang/mock/gomock"
"github.com/golang/protobuf/ptypes/empty"
"github.com/prysmaticlabs/prysm/v4/io/logs/mock"
"github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
pb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v4/testing/require"
validatormock "github.com/prysmaticlabs/prysm/v4/testing/validator-mock"
"google.golang.org/grpc"
)
type MockBeaconNodeHealthClient struct {
grpc.ClientStream
logs []*pb.LogsResponse
err error
}
func (m *MockBeaconNodeHealthClient) StreamBeaconLogs(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (eth.Health_StreamBeaconLogsClient, error) {
return m, m.err
}
func (m *MockBeaconNodeHealthClient) Recv() (*eth.LogsResponse, error) {
if len(m.logs) == 0 {
return nil, io.EOF
}
log := m.logs[0]
m.logs = m.logs[1:]
return log, nil
}
func (m *MockBeaconNodeHealthClient) SendMsg(_ interface{}) error {
return m.err
}
func (m *MockBeaconNodeHealthClient) Context() context.Context {
return context.Background()
}
type flushableResponseRecorder struct {
*httptest.ResponseRecorder
flushed bool
}
func (f *flushableResponseRecorder) Flush() {
f.flushed = true
}
func TestStreamBeaconLogs(t *testing.T) {
logs := []*pb.LogsResponse{
{
Logs: []string{"log1", "log2"},
},
{
Logs: []string{"log3", "log4"},
},
}
mockClient := &MockBeaconNodeHealthClient{
logs: logs,
err: nil,
}
// Setting up the mock in the server struct
s := Server{
ctx: context.Background(),
beaconNodeHealthClient: mockClient,
}
// Create a mock ResponseWriter and Request
w := &flushableResponseRecorder{
ResponseRecorder: httptest.NewRecorder(),
}
r := httptest.NewRequest("GET", "/v2/validator/health/logs/beacon/stream", nil)
// Call the function
s.StreamBeaconLogs(w, r)
// Assert the results
resp := w.Result()
if resp.StatusCode != http.StatusOK {
t.Fatalf("Expected status OK but got %v", resp.StatusCode)
}
ct, ok := resp.Header["Content-Type"]
require.Equal(t, ok, true)
require.Equal(t, ct[0], "text/event-stream")
cn, ok := resp.Header["Connection"]
require.Equal(t, ok, true)
require.Equal(t, cn[0], "keep-alive")
body, err := io.ReadAll(resp.Body)
require.NoError(t, err)
require.NotNil(t, body)
require.StringContains(t, `{"logs":["log1","log2"]}`, string(body))
require.StringContains(t, `{"logs":["log3","log4"]}`, string(body))
if !w.flushed {
t.Fatal("Flush was not called")
}
}
func TestStreamValidatorLogs(t *testing.T) {
ctx := context.Background()
mockLogs := [][]byte{
[]byte("[2023-10-31 10:00:00] INFO: Starting server..."),
[]byte("[2023-10-31 10:01:23] DEBUG: Database connection established."),
[]byte("[2023-10-31 10:05:45] WARN: High memory usage detected."),
[]byte("[2023-10-31 10:10:12] INFO: New user registered: user123."),
[]byte("[2023-10-31 10:15:30] ERROR: Failed to send email."),
}
logStreamer := mock.NewMockStreamer(mockLogs)
// Setting up the mock in the server struct
s := Server{
ctx: ctx,
logsStreamer: logStreamer,
streamLogsBufferSize: 100,
}
w := &flushableResponseRecorder{
ResponseRecorder: httptest.NewRecorder(),
}
r := httptest.NewRequest("GET", "/v2/validator/health/logs/validator/stream", nil)
go func() {
s.StreamValidatorLogs(w, r)
}()
// wait for initiation of StreamValidatorLogs
time.Sleep(100 * time.Millisecond)
logStreamer.LogsFeed().Send([]byte("Some mock event data"))
// wait for feed
time.Sleep(100 * time.Millisecond)
s.ctx.Done()
// Assert the results
resp := w.Result()
if resp.StatusCode != http.StatusOK {
t.Fatalf("Expected status OK but got %v", resp.StatusCode)
}
ct, ok := resp.Header["Content-Type"]
require.Equal(t, ok, true)
require.Equal(t, ct[0], "text/event-stream")
cn, ok := resp.Header["Connection"]
require.Equal(t, ok, true)
require.Equal(t, cn[0], "keep-alive")
// Check if data was written
body, err := io.ReadAll(resp.Body)
require.NoError(t, err)
require.NotNil(t, body)
require.StringContains(t, `{"logs":["[2023-10-31 10:00:00] INFO: Starting server...","[2023-10-31 10:01:23] DEBUG: Database connection established.",`+
`"[2023-10-31 10:05:45] WARN: High memory usage detected.","[2023-10-31 10:10:12] INFO: New user registered: user123.","[2023-10-31 10:15:30] ERROR: Failed to send email."]}`, string(body))
require.StringContains(t, `{"logs":["Some mock event data"]}`, string(body))
// Check if Flush was called
if !w.flushed {
t.Fatal("Flush was not called")
}
}
func TestServer_GetVersion(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
ctx := context.Background()
mockNodeClient := validatormock.NewMockNodeClient(ctrl)
s := Server{
ctx: ctx,
beaconNodeClient: mockNodeClient,
}
mockNodeClient.EXPECT().GetVersion(gomock.Any(), gomock.Any()).Return(&eth.Version{
Version: "4.10.1",
Metadata: "beacon node",
}, nil)
r := httptest.NewRequest("GET", "/v2/validator/health/version", nil)
w := httptest.NewRecorder()
w.Body = &bytes.Buffer{}
s.GetVersion(w, r)
resp := w.Result()
if resp.StatusCode != http.StatusOK {
t.Fatalf("Expected status OK but got %v", resp.StatusCode)
}
body, err := io.ReadAll(resp.Body)
require.NoError(t, err)
require.NotNil(t, body)
require.StringContains(t, `{"beacon":"4.10.1","validator":"Prysm/Unknown/Local build. Built at: Moments ago"}`, string(body))
}

View File

@@ -1,132 +0,0 @@
package rpc
import (
"context"
"time"
"github.com/pkg/errors"
pb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
validatorpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1/validator-client"
"github.com/prysmaticlabs/prysm/v4/runtime/version"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
)
// GetBeaconNodeConnection retrieves the current beacon node connection
// information, as well as its sync status.
// DEPRECATED: Prysm Web UI and associated endpoints will be fully removed in a future hard fork.
func (s *Server) GetBeaconNodeConnection(ctx context.Context, _ *emptypb.Empty) (*validatorpb.NodeConnectionResponse, error) {
syncStatus, err := s.syncChecker.Syncing(ctx)
if err != nil || s.validatorService.Status() != nil {
//nolint:nilerr
return &validatorpb.NodeConnectionResponse{
GenesisTime: 0,
BeaconNodeEndpoint: s.nodeGatewayEndpoint,
Connected: false,
Syncing: false,
}, nil
}
genesis, err := s.genesisFetcher.GenesisInfo(ctx)
if err != nil {
return nil, err
}
return &validatorpb.NodeConnectionResponse{
GenesisTime: uint64(time.Unix(genesis.GenesisTime.Seconds, 0).Unix()),
DepositContractAddress: genesis.DepositContractAddress,
BeaconNodeEndpoint: s.nodeGatewayEndpoint,
Connected: true,
Syncing: syncStatus,
}, nil
}
// GetLogsEndpoints for the beacon and validator client.
// DEPRECATED: Prysm Web UI and associated endpoints will be fully removed in a future hard fork.
func (*Server) GetLogsEndpoints(_ context.Context, _ *emptypb.Empty) (*validatorpb.LogsEndpointResponse, error) {
return nil, status.Error(codes.Unimplemented, "unimplemented")
}
// GetVersion --
// DEPRECATED: Prysm Web UI and associated endpoints will be fully removed in a future hard fork.
func (s *Server) GetVersion(ctx context.Context, _ *emptypb.Empty) (*validatorpb.VersionResponse, error) {
beacon, err := s.beaconNodeClient.GetVersion(ctx, &emptypb.Empty{})
if err != nil {
return nil, err
}
return &validatorpb.VersionResponse{
Beacon: beacon.Version,
Validator: version.Version(),
}, nil
}
// StreamBeaconLogs from the beacon node via a gRPC server-side stream.
// DEPRECATED: Prysm Web UI and associated endpoints will be fully removed in a future hard fork.
func (s *Server) StreamBeaconLogs(req *emptypb.Empty, stream validatorpb.Health_StreamBeaconLogsServer) error {
// Wrap service context with a cancel in order to propagate the exiting of
// this method properly to the beacon node server.
ctx, cancel := context.WithCancel(s.ctx)
defer cancel()
client, err := s.beaconNodeHealthClient.StreamBeaconLogs(ctx, req)
if err != nil {
return err
}
for {
select {
case <-s.ctx.Done():
return status.Error(codes.Canceled, "Context canceled")
case <-stream.Context().Done():
return status.Error(codes.Canceled, "Context canceled")
case <-client.Context().Done():
return status.Error(codes.Canceled, "Context canceled")
default:
resp, err := client.Recv()
if err != nil {
return errors.Wrap(err, "could not receive beacon logs from stream")
}
if err := stream.Send(resp); err != nil {
return status.Errorf(codes.Unavailable, "Could not send over stream: %v", err)
}
}
}
}
// StreamValidatorLogs from the validator client via a gRPC server-side stream.
// DEPRECATED: Prysm Web UI and associated endpoints will be fully removed in a future hard fork.
func (s *Server) StreamValidatorLogs(_ *emptypb.Empty, stream validatorpb.Health_StreamValidatorLogsServer) error {
ch := make(chan []byte, s.streamLogsBufferSize)
sub := s.logsStreamer.LogsFeed().Subscribe(ch)
defer func() {
sub.Unsubscribe()
defer close(ch)
}()
recentLogs := s.logsStreamer.GetLastFewLogs()
logStrings := make([]string, len(recentLogs))
for i, log := range recentLogs {
logStrings[i] = string(log)
}
if err := stream.Send(&pb.LogsResponse{
Logs: logStrings,
}); err != nil {
return status.Errorf(codes.Unavailable, "Could not send over stream: %v", err)
}
for {
select {
case log := <-ch:
resp := &pb.LogsResponse{
Logs: []string{string(log)},
}
if err := stream.Send(resp); err != nil {
return status.Errorf(codes.Unavailable, "Could not send over stream: %v", err)
}
case <-s.ctx.Done():
return status.Error(codes.Canceled, "Context canceled")
case err := <-sub.Err():
return status.Errorf(codes.Canceled, "Subscriber error, closing: %v", err)
case <-stream.Context().Done():
return status.Error(codes.Canceled, "Context canceled")
}
}
}

View File

@@ -1,54 +0,0 @@
package rpc
import (
"context"
"testing"
"time"
"github.com/golang/protobuf/ptypes/empty"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
pb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1/validator-client"
"github.com/prysmaticlabs/prysm/v4/testing/require"
"github.com/prysmaticlabs/prysm/v4/validator/client"
"google.golang.org/protobuf/types/known/timestamppb"
)
type mockSyncChecker struct {
syncing bool
}
func (m *mockSyncChecker) Syncing(_ context.Context) (bool, error) {
return m.syncing, nil
}
type mockGenesisFetcher struct{}
func (_ *mockGenesisFetcher) GenesisInfo(_ context.Context) (*ethpb.Genesis, error) {
genesis := timestamppb.New(time.Unix(0, 0))
return &ethpb.Genesis{
GenesisTime: genesis,
}, nil
}
func TestServer_GetBeaconNodeConnection(t *testing.T) {
ctx := context.Background()
endpoint := "localhost:90210"
vs, err := client.NewValidatorService(ctx, &client.Config{})
require.NoError(t, err)
s := &Server{
walletInitialized: true,
validatorService: vs,
syncChecker: &mockSyncChecker{syncing: false},
genesisFetcher: &mockGenesisFetcher{},
nodeGatewayEndpoint: endpoint,
}
got, err := s.GetBeaconNodeConnection(ctx, &empty.Empty{})
require.NoError(t, err)
want := &pb.NodeConnectionResponse{
BeaconNodeEndpoint: endpoint,
Connected: true,
Syncing: false,
GenesisTime: uint64(time.Unix(0, 0).Unix()),
}
require.DeepEqual(t, want, got)
}

View File

@@ -185,7 +185,6 @@ func (s *Server) Start() {
reflection.Register(s.grpcServer)
validatorpb.RegisterAuthServer(s.grpcServer, s)
validatorpb.RegisterWalletServer(s.grpcServer, s)
validatorpb.RegisterHealthServer(s.grpcServer, s)
validatorpb.RegisterBeaconServer(s.grpcServer, s)
validatorpb.RegisterAccountsServer(s.grpcServer, s)
validatorpb.RegisterSlashingProtectionServer(s.grpcServer, s)
@@ -234,7 +233,10 @@ func (s *Server) InitializeRoutes() error {
s.router.HandleFunc("/eth/v1/validator/{pubkey}/feerecipient", s.SetFeeRecipientByPubkey).Methods(http.MethodPost)
s.router.HandleFunc("/eth/v1/validator/{pubkey}/feerecipient", s.DeleteFeeRecipientByPubkey).Methods(http.MethodDelete)
s.router.HandleFunc("/eth/v1/validator/{pubkey}/voluntary_exit", s.SetVoluntaryExit).Methods(http.MethodPost)
// ...
// web health endpoints
s.router.HandleFunc("/v2/validator/health/version", s.GetVersion).Methods(http.MethodGet)
s.router.HandleFunc("/v2/validator/health/logs/validator/stream", s.StreamValidatorLogs).Methods(http.MethodGet)
s.router.HandleFunc("/v2/validator/health/logs/beacon/stream", s.StreamBeaconLogs).Methods(http.MethodGet)
log.Info("Initialized REST API routes")
return nil
}

View File

@@ -19,11 +19,14 @@ func TestServer_InitializeRoutes(t *testing.T) {
require.NoError(t, err)
wantRouteList := map[string][]string{
"/eth/v1/keystores": {http.MethodGet, http.MethodPost, http.MethodDelete},
"/eth/v1/remotekeys": {http.MethodGet, http.MethodPost, http.MethodDelete},
"/eth/v1/validator/{pubkey}/gas_limit": {http.MethodGet, http.MethodPost, http.MethodDelete},
"/eth/v1/validator/{pubkey}/feerecipient": {http.MethodGet, http.MethodPost, http.MethodDelete},
"/eth/v1/validator/{pubkey}/voluntary_exit": {http.MethodPost},
"/eth/v1/keystores": {http.MethodGet, http.MethodPost, http.MethodDelete},
"/eth/v1/remotekeys": {http.MethodGet, http.MethodPost, http.MethodDelete},
"/eth/v1/validator/{pubkey}/gas_limit": {http.MethodGet, http.MethodPost, http.MethodDelete},
"/eth/v1/validator/{pubkey}/feerecipient": {http.MethodGet, http.MethodPost, http.MethodDelete},
"/eth/v1/validator/{pubkey}/voluntary_exit": {http.MethodPost},
"/v2/validator/health/version": {http.MethodGet},
"/v2/validator/health/logs/validator/stream": {http.MethodGet},
"/v2/validator/health/logs/beacon/stream": {http.MethodGet},
}
gotRouteList := make(map[string][]string)
err = s.router.Walk(func(route *mux.Route, router *mux.Router, ancestors []*mux.Route) error {