gRPC Gateway Removal (#14089)

* wip passing e2e

* reverting temp comment

* remove unneeded comments

* fixing merge errors

* fixing more bugs from merge

* fixing test

* WIP moving code around and fixing tests

* unused linting

* gaz

* temp removing these tests as we need placeholder/wrapper APIs for them with the removal of the gateway

* attempting to remove dependencies to gRPC gateway , 1 mroe left in deps.bzl

* renaming flags and other gateway services to http

* goimport

* fixing deepsource

* git mv

* Update validator/package/validator.yaml

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* Update validator/package/validator.yaml

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* Update cmd/beacon-chain/flags/base.go

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* Update cmd/beacon-chain/flags/base.go

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* Update cmd/beacon-chain/flags/base.go

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* addressing feedback

* missed lint

* renaming import

* reversal based on feedback

* fixing web ui registration

* don't require mux handler

* gaz

* removing gRPC service from validator completely, merged with http service, renames are a work in progress

* updating go.sum

* linting

* trailing white space

* realized there was more cleanup i could do with code reuse

* adding wrapper for routes

* reverting version

* fixing dependencies from merging develop

* gaz

* fixing unit test

* fixing dependencies

* reverting unit test

* fixing conflict

* updating change log

* Update log.go

Co-authored-by: Preston Van Loon <pvanloon@offchainlabs.com>

* gaz

* Update api/server/httprest/server.go

Co-authored-by: Preston Van Loon <pvanloon@offchainlabs.com>

* addressing some feedback

* forgot to remove deprecated flag in usage

* gofmt

* fixing test

* fixing deepsource issue

* moving deprecated flag and adding timeout handler

* missed removal of a flag

* fixing test:

* Update CHANGELOG.md

Co-authored-by: Radosław Kapka <rkapka@wp.pl>

* addressing feedback

* updating comments based on feedback

* removing unused field for now, we can add it back in if we need to use the option

* removing unused struct

* changing api-timeout flag based on feedback

---------

Co-authored-by: Radosław Kapka <rkapka@wp.pl>
Co-authored-by: Preston Van Loon <pvanloon@offchainlabs.com>
This commit is contained in:
james-prysm
2024-09-04 10:40:31 -05:00
committed by GitHub
parent 963a1b4cb7
commit 45fd3eb1bf
94 changed files with 494 additions and 7751 deletions

View File

@@ -26,6 +26,7 @@ go_library(
"//api/grpc:go_default_library",
"//api/pagination:go_default_library",
"//api/server:go_default_library",
"//api/server/httprest:go_default_library",
"//api/server/structs:go_default_library",
"//async/event:go_default_library",
"//beacon-chain/rpc/eth/shared:go_default_library",
@@ -42,7 +43,6 @@ go_library(
"//io/file:go_default_library",
"//io/logs:go_default_library",
"//io/prompt:go_default_library",
"//monitoring/tracing:go_default_library",
"//monitoring/tracing/trace:go_default_library",
"//network/httputil:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
@@ -63,13 +63,13 @@ go_library(
"//validator/keymanager/local:go_default_library",
"//validator/slashing-protection-history:go_default_library",
"//validator/slashing-protection-history/format:go_default_library",
"//validator/web:go_default_library",
"@com_github_ethereum_go_ethereum//common:go_default_library",
"@com_github_ethereum_go_ethereum//common/hexutil:go_default_library",
"@com_github_fsnotify_fsnotify//:go_default_library",
"@com_github_golang_jwt_jwt_v4//:go_default_library",
"@com_github_gorilla_mux//:go_default_library",
"@com_github_grpc_ecosystem_go_grpc_middleware//:go_default_library",
"@com_github_grpc_ecosystem_go_grpc_middleware//recovery:go_default_library",
"@com_github_grpc_ecosystem_go_grpc_middleware//retry:go_default_library",
"@com_github_grpc_ecosystem_go_grpc_middleware//tracing/opentracing:go_default_library",
"@com_github_grpc_ecosystem_go_grpc_prometheus//:go_default_library",
@@ -78,11 +78,9 @@ go_library(
"@com_github_tyler_smith_go_bip39//:go_default_library",
"@com_github_tyler_smith_go_bip39//wordlists:go_default_library",
"@com_github_wealdtech_go_eth2_wallet_encryptor_keystorev4//:go_default_library",
"@io_opencensus_go//plugin/ocgrpc:go_default_library",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//codes:go_default_library",
"@org_golang_google_grpc//metadata:go_default_library",
"@org_golang_google_grpc//reflection:go_default_library",
"@org_golang_google_grpc//status:go_default_library",
"@org_golang_google_protobuf//types/known/emptypb:go_default_library",
],
@@ -144,7 +142,6 @@ go_test(
"@com_github_golang_protobuf//ptypes/empty",
"@com_github_google_uuid//:go_default_library",
"@com_github_gorilla_mux//:go_default_library",
"@com_github_grpc_ecosystem_grpc_gateway_v2//runtime:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
"@com_github_tyler_smith_go_bip39//:go_default_library",

View File

@@ -104,7 +104,7 @@ func (s *Server) refreshAuthTokenFromFileChanges(ctx context.Context, authTokenP
log.WithError(err).Errorf("Could not watch for file changes for: %s", authTokenPath)
continue
}
validatorWebAddr := fmt.Sprintf("%s:%d", s.grpcGatewayHost, s.grpcGatewayPort)
validatorWebAddr := fmt.Sprintf("%s:%d", s.httpHost, s.httpPort)
logValidatorWebAuth(validatorWebAddr, s.authToken, authTokenPath)
case err := <-watcher.Errors:
log.WithError(err).Errorf("Could not watch for file changes for: %s", authTokenPath)

View File

@@ -19,7 +19,7 @@ import (
"google.golang.org/grpc"
)
// Initialize a client connect to a beacon node gRPC endpoint.
// Initialize a client connect to a beacon node gRPC or HTTP endpoint.
func (s *Server) registerBeaconClient() error {
streamInterceptor := grpc.WithStreamInterceptor(middleware.ChainStreamClient(
grpcopentracing.StreamClientInterceptor(),
@@ -62,6 +62,5 @@ func (s *Server) registerBeaconClient() error {
s.chainClient = beaconChainClientFactory.NewChainClient(conn, restHandler)
s.nodeClient = nodeClientFactory.NewNodeClient(conn, restHandler)
s.beaconNodeValidatorClient = validatorClientFactory.NewValidatorClient(conn, restHandler)
return nil
}

View File

@@ -42,6 +42,7 @@ func (s *Server) GetBeaconStatus(w http.ResponseWriter, r *http.Request) {
}
genesisTime := uint64(time.Unix(genesis.GenesisTime.Seconds, 0).Unix())
address := genesis.DepositContractAddress
chainHead, err := s.chainClient.ChainHead(ctx, &emptypb.Empty{})
if err != nil {
httputil.HandleError(w, errors.Wrap(err, "ChainHead").Error(), http.StatusInternalServerError)

View File

@@ -17,7 +17,6 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/gorilla/mux"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"github.com/prysmaticlabs/prysm/v5/cmd/validator/flags"
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
"github.com/prysmaticlabs/prysm/v5/config/params"
@@ -46,7 +45,6 @@ import (
mocks "github.com/prysmaticlabs/prysm/v5/validator/testing"
"github.com/urfave/cli/v2"
"go.uber.org/mock/gomock"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/timestamppb"
)
@@ -707,7 +705,7 @@ func TestServer_SetVoluntaryExit(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
ctx := grpc.NewContextWithServerTransportStream(context.Background(), &runtime.ServerTransportStream{})
ctx := context.Background()
defaultWalletPath = setupWalletDir(t)
opts := []accounts.Option{
accounts.WithWalletDir(defaultWalletPath),
@@ -970,7 +968,7 @@ func TestServer_SetGasLimit(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
beaconClient := validatormock.NewMockValidatorClient(ctrl)
ctx := grpc.NewContextWithServerTransportStream(context.Background(), &runtime.ServerTransportStream{})
ctx := context.Background()
pubkey1, err := hexutil.Decode("0xaf2e7ba294e03438ea819bd4033c6c1bf6b04320ee2075b77273c08d02f8a61bcc303c2c06bd3713cb442072ae591493")
pubkey2, err2 := hexutil.Decode("0xbedefeaa94e03438ea819bd4033c6c1bf6b04320ee2075b77273c08d02f8a61bcc303c2cdddddddddddddddddddddddd")
@@ -1178,7 +1176,7 @@ func TestServer_SetGasLimit_InvalidPubKey(t *testing.T) {
}
func TestServer_DeleteGasLimit(t *testing.T) {
ctx := grpc.NewContextWithServerTransportStream(context.Background(), &runtime.ServerTransportStream{})
ctx := context.Background()
pubkey1, err := hexutil.Decode("0xaf2e7ba294e03438ea819bd4033c6c1bf6b04320ee2075b77273c08d02f8a61bcc303c2c06bd3713cb442072ae591493")
pubkey2, err2 := hexutil.Decode("0xbedefeaa94e03438ea819bd4033c6c1bf6b04320ee2075b77273c08d02f8a61bcc303c2cdddddddddddddddddddddddd")
require.NoError(t, err)
@@ -1621,7 +1619,7 @@ func TestServer_FeeRecipientByPubkey(t *testing.T) {
defer ctrl.Finish()
beaconClient := validatormock.NewMockValidatorClient(ctrl)
ctx := grpc.NewContextWithServerTransportStream(context.Background(), &runtime.ServerTransportStream{})
ctx := context.Background()
pubkey := "0xaf2e7ba294e03438ea819bd4033c6c1bf6b04320ee2075b77273c08d02f8a61bcc303c2c06bd3713cb442072ae591493"
byteval, err := hexutil.Decode(pubkey)
require.NoError(t, err)
@@ -1831,7 +1829,7 @@ func TestServer_SetFeeRecipientByPubkey_InvalidFeeRecipient(t *testing.T) {
}
func TestServer_DeleteFeeRecipientByPubkey(t *testing.T) {
ctx := grpc.NewContextWithServerTransportStream(context.Background(), &runtime.ServerTransportStream{})
ctx := context.Background()
pubkey := "0xaf2e7ba294e03438ea819bd4033c6c1bf6b04320ee2075b77273c08d02f8a61bcc303c2c06bd3713cb442072ae591493"
byteval, err := hexutil.Decode(pubkey)
require.NoError(t, err)

View File

@@ -6,34 +6,27 @@ import (
"net"
"net/http"
"path/filepath"
"strings"
"time"
"github.com/gorilla/mux"
middleware "github.com/grpc-ecosystem/go-grpc-middleware"
recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
grpcopentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/api"
"github.com/prysmaticlabs/prysm/v5/api/server/httprest"
"github.com/prysmaticlabs/prysm/v5/async/event"
"github.com/prysmaticlabs/prysm/v5/io/logs"
"github.com/prysmaticlabs/prysm/v5/monitoring/tracing"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/validator/accounts/wallet"
"github.com/prysmaticlabs/prysm/v5/validator/client"
iface "github.com/prysmaticlabs/prysm/v5/validator/client/iface"
"github.com/prysmaticlabs/prysm/v5/validator/db"
"go.opencensus.io/plugin/ocgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
"github.com/prysmaticlabs/prysm/v5/validator/web"
)
// Config options for the gRPC server.
// Config options for the HTTP server.
type Config struct {
Host string
Port string
GRPCGatewayHost string
GRPCGatewayPort int
HTTPHost string
HTTPPort int
GRPCMaxCallRecvMsgSize int
GRPCRetries uint
GRPCRetryDelay time.Duration
@@ -51,20 +44,17 @@ type Config struct {
Router *mux.Router
}
// Server defining a gRPC server for the remote signer API.
// Server defining a HTTP server for the remote signer API and registering clients
type Server struct {
ctx context.Context
cancel context.CancelFunc
host string
port string
grpcGatewayHost string
grpcGatewayPort int
listener net.Listener
httpHost string
httpPort int
server *httprest.Server
grpcMaxCallRecvMsgSize int
grpcRetries uint
grpcRetryDelay time.Duration
grpcHeaders []string
grpcServer *grpc.Server
beaconNodeValidatorClient iface.ValidatorClient
chainClient iface.ChainClient
nodeClient iface.NodeClient
@@ -85,9 +75,10 @@ type Server struct {
router *mux.Router
logStreamer logs.Streamer
logStreamerBufferSize int
startFailure error
}
// NewServer instantiates a new gRPC server.
// NewServer instantiates a new HTTP server.
func NewServer(ctx context.Context, cfg *Config) *Server {
ctx, cancel := context.WithCancel(ctx)
server := &Server{
@@ -95,10 +86,8 @@ func NewServer(ctx context.Context, cfg *Config) *Server {
cancel: cancel,
logStreamer: logs.NewStreamServer(),
logStreamerBufferSize: 1000, // Enough to handle most bursts of logs in the validator client.
host: cfg.Host,
port: cfg.Port,
grpcGatewayHost: cfg.GRPCGatewayHost,
grpcGatewayPort: cfg.GRPCGatewayPort,
httpHost: cfg.HTTPHost,
httpPort: cfg.HTTPPort,
grpcMaxCallRecvMsgSize: cfg.GRPCMaxCallRecvMsgSize,
grpcRetries: cfg.GRPCRetries,
grpcRetryDelay: cfg.GRPCRetryDelay,
@@ -124,62 +113,54 @@ func NewServer(ctx context.Context, cfg *Config) *Server {
if err := server.initializeAuthToken(); err != nil {
log.WithError(err).Error("Could not initialize web auth token")
}
validatorWebAddr := fmt.Sprintf("%s:%d", server.grpcGatewayHost, server.grpcGatewayPort)
validatorWebAddr := fmt.Sprintf("%s:%d", server.httpHost, server.httpPort)
logValidatorWebAuth(validatorWebAddr, server.authToken, server.authTokenPath)
go server.refreshAuthTokenFromFileChanges(server.ctx, server.authTokenPath)
}
// immediately register routes to override any catchalls
if err := server.InitializeRoutes(); err != nil {
log.WithError(err).Fatal("Could not initialize routes")
// Register a gRPC or HTTP client to the beacon node.
// Used for proxy calls to beacon node from validator REST handlers
if err := server.registerBeaconClient(); err != nil {
log.WithError(err).Fatal("Could not register beacon chain gRPC or HTTP client")
}
if err := server.InitializeRoutesWithWebHandler(); err != nil {
log.WithError(err).Fatal("Could not initialize routes with web handler")
}
opts := []httprest.Option{
httprest.WithRouter(cfg.Router),
httprest.WithHTTPAddr(net.JoinHostPort(server.httpHost, fmt.Sprintf("%d", server.httpPort))),
}
// create and set a new http server
s, err := httprest.New(server.ctx, opts...)
if err != nil {
log.WithError(err).Fatal("Failed to create HTTP server")
}
server.server = s
return server
}
// Start the gRPC server.
// Start the HTTP server and registers clients that can communicate via HTTP or gRPC.
func (s *Server) Start() {
// Setup the gRPC server options and TLS configuration.
address := net.JoinHostPort(s.host, s.port)
lis, err := net.Listen("tcp", address)
if err != nil {
log.WithError(err).Errorf("Could not listen to port in Start() %s", address)
s.server.Start()
}
// InitializeRoutesWithWebHandler adds a catchall wrapper for web handling
func (s *Server) InitializeRoutesWithWebHandler() error {
if err := s.InitializeRoutes(); err != nil {
return err
}
s.listener = lis
// Register interceptors for metrics gathering as well as our
// own, custom JWT unary interceptor.
opts := []grpc.ServerOption{
grpc.StatsHandler(&ocgrpc.ServerHandler{}),
grpc.UnaryInterceptor(middleware.ChainUnaryServer(
recovery.UnaryServerInterceptor(
recovery.WithRecoveryHandlerContext(tracing.RecoveryHandlerFunc),
),
grpcprometheus.UnaryServerInterceptor,
grpcopentracing.UnaryServerInterceptor(),
s.AuthTokenInterceptor(),
)),
}
grpcprometheus.EnableHandlingTimeHistogram()
s.grpcServer = grpc.NewServer(opts...)
// Register a gRPC client to the beacon node.
if err := s.registerBeaconClient(); err != nil {
log.WithError(err).Fatal("Could not register beacon chain gRPC client")
}
// Register services available for the gRPC server.
reflection.Register(s.grpcServer)
// routes needs to be set before the server calls the server function
go func() {
if s.listener != nil {
if err := s.grpcServer.Serve(s.listener); err != nil {
log.WithError(err).Error("Could not serve")
}
s.router.PathPrefix("/").HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if strings.HasPrefix(r.URL.Path, "/api") {
r.URL.Path = strings.Replace(r.URL.Path, "/api", "", 1) // used to redirect apis to standard rest APIs
s.router.ServeHTTP(w, r)
} else {
// Finally, we handle with the web server.
web.Handler(w, r)
}
}()
log.WithField("address", address).Info("gRPC server listening on address")
})
return nil
}
// InitializeRoutes initializes pure HTTP REST endpoints for the validator client.
@@ -233,21 +214,20 @@ func (s *Server) InitializeRoutes() error {
// slashing protection endpoints
s.router.HandleFunc(api.WebUrlPrefix+"slashing-protection/export", s.ExportSlashingProtection).Methods(http.MethodGet)
s.router.HandleFunc(api.WebUrlPrefix+"slashing-protection/import", s.ImportSlashingProtection).Methods(http.MethodPost)
log.Info("Initialized REST API routes")
return nil
}
// Stop the gRPC server.
// Stop the HTTP server.
func (s *Server) Stop() error {
s.cancel()
if s.listener != nil {
s.grpcServer.GracefulStop()
log.Debug("Initiated graceful stop of server")
}
return nil
return s.server.Stop()
}
// Status returns an error if the service is unhealthy.
func (s *Server) Status() error {
if s.startFailure != nil {
return s.startFailure
}
return nil
}