Files
prysm/validator/rpc/server.go
james-prysm 2e84208169 WebFix develop (#14040)
* fixing issues introduced by PR 13593

* missed setting db

* linting
2024-05-23 14:07:30 +00:00

254 lines
10 KiB
Go

package rpc
import (
"context"
"fmt"
"net"
"net/http"
"path/filepath"
"time"
"github.com/gorilla/mux"
middleware "github.com/grpc-ecosystem/go-grpc-middleware"
recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
grpcopentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/api"
"github.com/prysmaticlabs/prysm/v5/async/event"
"github.com/prysmaticlabs/prysm/v5/io/logs"
"github.com/prysmaticlabs/prysm/v5/monitoring/tracing"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/validator/accounts/wallet"
"github.com/prysmaticlabs/prysm/v5/validator/client"
iface "github.com/prysmaticlabs/prysm/v5/validator/client/iface"
"github.com/prysmaticlabs/prysm/v5/validator/db"
"go.opencensus.io/plugin/ocgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)
// Config options for the gRPC server.
type Config struct {
Host string
Port string
GRPCGatewayHost string
GRPCGatewayPort int
GRPCMaxCallRecvMsgSize int
GRPCRetries uint
GRPCRetryDelay time.Duration
GRPCHeaders []string
BeaconNodeGRPCEndpoint string
BeaconApiEndpoint string
BeaconApiTimeout time.Duration
BeaconNodeCert string
DB db.Database
Wallet *wallet.Wallet
WalletDir string
WalletInitializedFeed *event.Feed
ValidatorService *client.ValidatorService
AuthTokenPath string
Router *mux.Router
}
// Server defining a gRPC server for the remote signer API.
type Server struct {
ctx context.Context
cancel context.CancelFunc
host string
port string
grpcGatewayHost string
grpcGatewayPort int
listener net.Listener
grpcMaxCallRecvMsgSize int
grpcRetries uint
grpcRetryDelay time.Duration
grpcHeaders []string
grpcServer *grpc.Server
beaconNodeValidatorClient iface.ValidatorClient
chainClient iface.ChainClient
nodeClient iface.NodeClient
healthClient ethpb.HealthClient
beaconNodeEndpoint string
beaconApiEndpoint string
beaconApiTimeout time.Duration
beaconNodeCert string
jwtSecret []byte
authTokenPath string
authToken string
db db.Database
walletDir string
wallet *wallet.Wallet
walletInitializedFeed *event.Feed
walletInitialized bool
validatorService *client.ValidatorService
router *mux.Router
logStreamer logs.Streamer
logStreamerBufferSize int
}
// NewServer instantiates a new gRPC server.
func NewServer(ctx context.Context, cfg *Config) *Server {
ctx, cancel := context.WithCancel(ctx)
server := &Server{
ctx: ctx,
cancel: cancel,
logStreamer: logs.NewStreamServer(),
logStreamerBufferSize: 1000, // Enough to handle most bursts of logs in the validator client.
host: cfg.Host,
port: cfg.Port,
grpcGatewayHost: cfg.GRPCGatewayHost,
grpcGatewayPort: cfg.GRPCGatewayPort,
grpcMaxCallRecvMsgSize: cfg.GRPCMaxCallRecvMsgSize,
grpcRetries: cfg.GRPCRetries,
grpcRetryDelay: cfg.GRPCRetryDelay,
grpcHeaders: cfg.GRPCHeaders,
validatorService: cfg.ValidatorService,
authTokenPath: cfg.AuthTokenPath,
db: cfg.DB,
walletDir: cfg.WalletDir,
walletInitializedFeed: cfg.WalletInitializedFeed,
walletInitialized: cfg.Wallet != nil,
wallet: cfg.Wallet,
beaconApiTimeout: cfg.BeaconApiTimeout,
beaconApiEndpoint: cfg.BeaconApiEndpoint,
beaconNodeEndpoint: cfg.BeaconNodeGRPCEndpoint,
router: cfg.Router,
}
if server.authTokenPath == "" && server.walletDir != "" {
server.authTokenPath = filepath.Join(server.walletDir, api.AuthTokenFileName)
}
if server.authTokenPath != "" {
if err := server.initializeAuthToken(); err != nil {
log.WithError(err).Error("Could not initialize web auth token")
}
validatorWebAddr := fmt.Sprintf("%s:%d", server.grpcGatewayHost, server.grpcGatewayPort)
logValidatorWebAuth(validatorWebAddr, server.authToken, server.authTokenPath)
go server.refreshAuthTokenFromFileChanges(server.ctx, server.authTokenPath)
}
// immediately register routes to override any catchalls
if err := server.InitializeRoutes(); err != nil {
log.WithError(err).Fatal("Could not initialize routes")
}
return server
}
// Start the gRPC server.
func (s *Server) Start() {
// Setup the gRPC server options and TLS configuration.
address := net.JoinHostPort(s.host, s.port)
lis, err := net.Listen("tcp", address)
if err != nil {
log.WithError(err).Errorf("Could not listen to port in Start() %s", address)
}
s.listener = lis
// Register interceptors for metrics gathering as well as our
// own, custom JWT unary interceptor.
opts := []grpc.ServerOption{
grpc.StatsHandler(&ocgrpc.ServerHandler{}),
grpc.UnaryInterceptor(middleware.ChainUnaryServer(
recovery.UnaryServerInterceptor(
recovery.WithRecoveryHandlerContext(tracing.RecoveryHandlerFunc),
),
grpcprometheus.UnaryServerInterceptor,
grpcopentracing.UnaryServerInterceptor(),
s.AuthTokenInterceptor(),
)),
}
grpcprometheus.EnableHandlingTimeHistogram()
s.grpcServer = grpc.NewServer(opts...)
// Register a gRPC client to the beacon node.
if err := s.registerBeaconClient(); err != nil {
log.WithError(err).Fatal("Could not register beacon chain gRPC client")
}
// Register services available for the gRPC server.
reflection.Register(s.grpcServer)
// routes needs to be set before the server calls the server function
go func() {
if s.listener != nil {
if err := s.grpcServer.Serve(s.listener); err != nil {
log.WithError(err).Error("Could not serve")
}
}
}()
log.WithField("address", address).Info("gRPC server listening on address")
}
// InitializeRoutes initializes pure HTTP REST endpoints for the validator client.
// needs to be called before the Serve function
func (s *Server) InitializeRoutes() error {
if s.router == nil {
return errors.New("no router found on server")
}
// Adding Auth Interceptor for the routes below
s.router.Use(s.AuthTokenHandler)
// Register all services, HandleFunc calls, etc.
// ...
s.router.HandleFunc("/eth/v1/keystores", s.ListKeystores).Methods(http.MethodGet)
s.router.HandleFunc("/eth/v1/keystores", s.ImportKeystores).Methods(http.MethodPost)
s.router.HandleFunc("/eth/v1/keystores", s.DeleteKeystores).Methods(http.MethodDelete)
s.router.HandleFunc("/eth/v1/remotekeys", s.ListRemoteKeys).Methods(http.MethodGet)
s.router.HandleFunc("/eth/v1/remotekeys", s.ImportRemoteKeys).Methods(http.MethodPost)
s.router.HandleFunc("/eth/v1/remotekeys", s.DeleteRemoteKeys).Methods(http.MethodDelete)
s.router.HandleFunc("/eth/v1/validator/{pubkey}/gas_limit", s.GetGasLimit).Methods(http.MethodGet)
s.router.HandleFunc("/eth/v1/validator/{pubkey}/gas_limit", s.SetGasLimit).Methods(http.MethodPost)
s.router.HandleFunc("/eth/v1/validator/{pubkey}/gas_limit", s.DeleteGasLimit).Methods(http.MethodDelete)
s.router.HandleFunc("/eth/v1/validator/{pubkey}/feerecipient", s.ListFeeRecipientByPubkey).Methods(http.MethodGet)
s.router.HandleFunc("/eth/v1/validator/{pubkey}/feerecipient", s.SetFeeRecipientByPubkey).Methods(http.MethodPost)
s.router.HandleFunc("/eth/v1/validator/{pubkey}/feerecipient", s.DeleteFeeRecipientByPubkey).Methods(http.MethodDelete)
s.router.HandleFunc("/eth/v1/validator/{pubkey}/voluntary_exit", s.SetVoluntaryExit).Methods(http.MethodPost)
s.router.HandleFunc("/eth/v1/validator/{pubkey}/graffiti", s.GetGraffiti).Methods(http.MethodGet)
s.router.HandleFunc("/eth/v1/validator/{pubkey}/graffiti", s.SetGraffiti).Methods(http.MethodPost)
s.router.HandleFunc("/eth/v1/validator/{pubkey}/graffiti", s.DeleteGraffiti).Methods(http.MethodDelete)
// auth endpoint
s.router.HandleFunc(api.WebUrlPrefix+"initialize", s.Initialize).Methods(http.MethodGet)
// accounts endpoints
s.router.HandleFunc(api.WebUrlPrefix+"accounts", s.ListAccounts).Methods(http.MethodGet)
s.router.HandleFunc(api.WebUrlPrefix+"accounts/backup", s.BackupAccounts).Methods(http.MethodPost)
s.router.HandleFunc(api.WebUrlPrefix+"accounts/voluntary-exit", s.VoluntaryExit).Methods(http.MethodPost)
// web health endpoints
s.router.HandleFunc(api.WebUrlPrefix+"health/version", s.GetVersion).Methods(http.MethodGet)
s.router.HandleFunc(api.WebUrlPrefix+"health/logs/validator/stream", s.StreamValidatorLogs).Methods(http.MethodGet)
s.router.HandleFunc(api.WebUrlPrefix+"health/logs/beacon/stream", s.StreamBeaconLogs).Methods(http.MethodGet)
// Beacon calls
s.router.HandleFunc(api.WebUrlPrefix+"beacon/status", s.GetBeaconStatus).Methods(http.MethodGet)
s.router.HandleFunc(api.WebUrlPrefix+"beacon/summary", s.GetValidatorPerformance).Methods(http.MethodGet)
s.router.HandleFunc(api.WebUrlPrefix+"beacon/validators", s.GetValidators).Methods(http.MethodGet)
s.router.HandleFunc(api.WebUrlPrefix+"beacon/balances", s.GetValidatorBalances).Methods(http.MethodGet)
s.router.HandleFunc(api.WebUrlPrefix+"beacon/peers", s.GetPeers).Methods(http.MethodGet)
// web wallet endpoints
s.router.HandleFunc(api.WebUrlPrefix+"wallet", s.WalletConfig).Methods(http.MethodGet)
s.router.HandleFunc(api.WebUrlPrefix+"wallet/create", s.CreateWallet).Methods(http.MethodPost)
s.router.HandleFunc(api.WebUrlPrefix+"wallet/keystores/validate", s.ValidateKeystores).Methods(http.MethodPost)
s.router.HandleFunc(api.WebUrlPrefix+"wallet/recover", s.RecoverWallet).Methods(http.MethodPost)
// slashing protection endpoints
s.router.HandleFunc(api.WebUrlPrefix+"slashing-protection/export", s.ExportSlashingProtection).Methods(http.MethodGet)
s.router.HandleFunc(api.WebUrlPrefix+"slashing-protection/import", s.ImportSlashingProtection).Methods(http.MethodPost)
log.Info("Initialized REST API routes")
return nil
}
// Stop the gRPC server.
func (s *Server) Stop() error {
s.cancel()
if s.listener != nil {
s.grpcServer.GracefulStop()
log.Debug("Initiated graceful stop of server")
}
return nil
}
// Status returns an error if the service is unhealthy.
func (s *Server) Status() error {
return nil
}