Substantial VC cleanup (#13593)

* Cleanup part 1

* Cleanup part 2

* Cleanup part 3

* remove lock field init

* doc for SignerConfig

* remove vars

* use full Keymanager word in function

* revert interface rename

* linter

* fix build issues

* review
This commit is contained in:
Radosław Kapka
2024-05-22 01:39:00 +09:00
committed by GitHub
parent 9befb6bd06
commit 30cc23c5de
49 changed files with 673 additions and 802 deletions

View File

@@ -81,7 +81,6 @@ go_library(
"@io_opencensus_go//trace:go_default_library",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//codes:go_default_library",
"@org_golang_google_grpc//credentials:go_default_library",
"@org_golang_google_grpc//metadata:go_default_library",
"@org_golang_google_grpc//reflection:go_default_library",
"@org_golang_google_grpc//status:go_default_library",

View File

@@ -104,7 +104,7 @@ func (s *Server) refreshAuthTokenFromFileChanges(ctx context.Context, authTokenP
log.WithError(err).Errorf("Could not watch for file changes for: %s", authTokenPath)
continue
}
validatorWebAddr := fmt.Sprintf("%s:%d", s.validatorGatewayHost, s.validatorGatewayPort)
validatorWebAddr := fmt.Sprintf("%s:%d", s.grpcGatewayHost, s.grpcGatewayPort)
logValidatorWebAuth(validatorWebAddr, s.authToken, authTokenPath)
case err := <-watcher.Errors:
log.WithError(err).Errorf("Could not watch for file changes for: %s", authTokenPath)

View File

@@ -27,26 +27,26 @@ func (s *Server) registerBeaconClient() error {
grpcretry.StreamClientInterceptor(),
))
dialOpts := client.ConstructDialOptions(
s.clientMaxCallRecvMsgSize,
s.clientWithCert,
s.clientGrpcRetries,
s.clientGrpcRetryDelay,
s.grpcMaxCallRecvMsgSize,
s.beaconNodeCert,
s.grpcRetries,
s.grpcRetryDelay,
streamInterceptor,
)
if dialOpts == nil {
return errors.New("no dial options for beacon chain gRPC client")
}
s.ctx = grpcutil.AppendHeaders(s.ctx, s.clientGrpcHeaders)
s.ctx = grpcutil.AppendHeaders(s.ctx, s.grpcHeaders)
grpcConn, err := grpc.DialContext(s.ctx, s.beaconClientEndpoint, dialOpts...)
grpcConn, err := grpc.DialContext(s.ctx, s.beaconNodeEndpoint, dialOpts...)
if err != nil {
return errors.Wrapf(err, "could not dial endpoint: %s", s.beaconClientEndpoint)
return errors.Wrapf(err, "could not dial endpoint: %s", s.beaconNodeEndpoint)
}
if s.clientWithCert != "" {
if s.beaconNodeCert != "" {
log.Info("Established secure gRPC connection")
}
s.beaconNodeHealthClient = ethpb.NewHealthClient(grpcConn)
s.healthClient = ethpb.NewHealthClient(grpcConn)
conn := validatorHelpers.NewNodeConnection(
grpcConn,
@@ -56,8 +56,8 @@ func (s *Server) registerBeaconClient() error {
restHandler := beaconApi.NewBeaconApiJsonRestHandler(http.Client{Timeout: s.beaconApiTimeout}, s.beaconApiEndpoint)
s.beaconChainClient = beaconChainClientFactory.NewBeaconChainClient(conn, restHandler)
s.beaconNodeClient = nodeClientFactory.NewNodeClient(conn, restHandler)
s.chainClient = beaconChainClientFactory.NewChainClient(conn, restHandler)
s.nodeClient = nodeClientFactory.NewNodeClient(conn, restHandler)
s.beaconNodeValidatorClient = validatorClientFactory.NewValidatorClient(conn, restHandler)
return nil

View File

@@ -11,8 +11,8 @@ import (
func TestGrpcHeaders(t *testing.T) {
s := &Server{
ctx: context.Background(),
clientGrpcHeaders: []string{"first=value1", "second=value2"},
ctx: context.Background(),
grpcHeaders: []string{"first=value1", "second=value2"},
}
err := s.registerBeaconClient()
require.NoError(t, err)

View File

@@ -258,7 +258,7 @@ func (s *Server) VoluntaryExit(w http.ResponseWriter, r *http.Request) {
}
cfg := accounts.PerformExitCfg{
ValidatorClient: s.beaconNodeValidatorClient,
NodeClient: s.beaconNodeClient,
NodeClient: s.nodeClient,
Keymanager: km,
RawPubKeys: pubKeys,
FormattedPubKeys: req.PublicKeys,

View File

@@ -292,7 +292,7 @@ func TestServer_VoluntaryExit(t *testing.T) {
s := &Server{
walletInitialized: true,
wallet: w,
beaconNodeClient: mockNodeClient,
nodeClient: mockNodeClient,
beaconNodeValidatorClient: mockValidatorClient,
validatorService: vs,
}

View File

@@ -25,30 +25,30 @@ import (
func (s *Server) GetBeaconStatus(w http.ResponseWriter, r *http.Request) {
ctx, span := trace.StartSpan(r.Context(), "validator.web.beacon.GetBeaconStatus")
defer span.End()
syncStatus, err := s.beaconNodeClient.GetSyncStatus(ctx, &emptypb.Empty{})
syncStatus, err := s.nodeClient.GetSyncStatus(ctx, &emptypb.Empty{})
if err != nil {
log.WithError(err).Error("beacon node call to get sync status failed")
httputil.WriteJson(w, &BeaconStatusResponse{
BeaconNodeEndpoint: s.nodeGatewayEndpoint,
BeaconNodeEndpoint: s.beaconNodeEndpoint,
Connected: false,
Syncing: false,
})
return
}
genesis, err := s.beaconNodeClient.GetGenesis(ctx, &emptypb.Empty{})
genesis, err := s.nodeClient.GetGenesis(ctx, &emptypb.Empty{})
if err != nil {
httputil.HandleError(w, errors.Wrap(err, "GetGenesis call failed").Error(), http.StatusInternalServerError)
return
}
genesisTime := uint64(time.Unix(genesis.GenesisTime.Seconds, 0).Unix())
address := genesis.DepositContractAddress
chainHead, err := s.beaconChainClient.GetChainHead(ctx, &emptypb.Empty{})
chainHead, err := s.chainClient.GetChainHead(ctx, &emptypb.Empty{})
if err != nil {
httputil.HandleError(w, errors.Wrap(err, "GetChainHead").Error(), http.StatusInternalServerError)
return
}
httputil.WriteJson(w, &BeaconStatusResponse{
BeaconNodeEndpoint: s.beaconClientEndpoint,
BeaconNodeEndpoint: s.beaconNodeEndpoint,
Connected: true,
Syncing: syncStatus.Syncing,
GenesisTime: fmt.Sprintf("%d", genesisTime),
@@ -85,7 +85,7 @@ func (s *Server) GetValidatorPerformance(w http.ResponseWriter, r *http.Request)
req := &ethpb.ValidatorPerformanceRequest{
PublicKeys: pubkeys,
}
validatorPerformance, err := s.beaconChainClient.GetValidatorPerformance(ctx, req)
validatorPerformance, err := s.chainClient.GetValidatorPerformance(ctx, req)
if err != nil {
httputil.HandleError(w, errors.Wrap(err, "GetValidatorPerformance call failed").Error(), http.StatusInternalServerError)
return
@@ -133,7 +133,7 @@ func (s *Server) GetValidatorBalances(w http.ResponseWriter, r *http.Request) {
PageSize: int32(ps),
PageToken: pageToken,
}
listValidatorBalances, err := s.beaconChainClient.ListValidatorBalances(ctx, req)
listValidatorBalances, err := s.chainClient.ListValidatorBalances(ctx, req)
if err != nil {
httputil.HandleError(w, errors.Wrap(err, "ListValidatorBalances call failed").Error(), http.StatusInternalServerError)
return
@@ -187,7 +187,7 @@ func (s *Server) GetValidators(w http.ResponseWriter, r *http.Request) {
PageSize: int32(ps),
PageToken: pageToken,
}
validators, err := s.beaconChainClient.ListValidators(ctx, req)
validators, err := s.chainClient.ListValidators(ctx, req)
if err != nil {
httputil.HandleError(w, errors.Wrap(err, "ListValidators call failed").Error(), http.StatusInternalServerError)
return
@@ -204,7 +204,7 @@ func (s *Server) GetValidators(w http.ResponseWriter, r *http.Request) {
func (s *Server) GetPeers(w http.ResponseWriter, r *http.Request) {
ctx, span := trace.StartSpan(r.Context(), "validator.web.beacon.GetPeers")
defer span.End()
peers, err := s.beaconNodeClient.ListPeers(ctx, &emptypb.Empty{})
peers, err := s.nodeClient.ListPeers(ctx, &emptypb.Empty{})
if err != nil {
httputil.HandleError(w, errors.Wrap(err, "ListPeers call failed").Error(), http.StatusInternalServerError)
return

View File

@@ -27,7 +27,7 @@ func TestGetBeaconStatus_NotConnected(t *testing.T) {
gomock.Any(),
).Return(nil /*response*/, errors.New("uh oh"))
srv := &Server{
beaconNodeClient: nodeClient,
nodeClient: nodeClient,
}
req := httptest.NewRequest(http.MethodGet, fmt.Sprintf("/v2/validator/beacon/status"), nil)
wr := httptest.NewRecorder()
@@ -47,7 +47,7 @@ func TestGetBeaconStatus_NotConnected(t *testing.T) {
func TestGetBeaconStatus_OK(t *testing.T) {
ctrl := gomock.NewController(t)
nodeClient := validatormock.NewMockNodeClient(ctrl)
beaconChainClient := validatormock.NewMockBeaconChainClient(ctrl)
chainClient := validatormock.NewMockChainClient(ctrl)
nodeClient.EXPECT().GetSyncStatus(
gomock.Any(), // ctx
gomock.Any(),
@@ -60,15 +60,15 @@ func TestGetBeaconStatus_OK(t *testing.T) {
GenesisTime: timeStamp,
DepositContractAddress: []byte("hello"),
}, nil)
beaconChainClient.EXPECT().GetChainHead(
chainClient.EXPECT().GetChainHead(
gomock.Any(), // ctx
gomock.Any(),
).Return(&ethpb.ChainHead{
HeadEpoch: 1,
}, nil)
srv := &Server{
beaconNodeClient: nodeClient,
beaconChainClient: beaconChainClient,
nodeClient: nodeClient,
chainClient: chainClient,
}
req := httptest.NewRequest(http.MethodGet, fmt.Sprintf("/v2/validator/beacon/status"), nil)
@@ -228,7 +228,7 @@ func TestServer_GetValidators(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
beaconChainClient := validatormock.NewMockBeaconChainClient(ctrl)
beaconChainClient := validatormock.NewMockChainClient(ctrl)
if tt.wantErr == "" {
beaconChainClient.EXPECT().ListValidators(
gomock.Any(), // ctx
@@ -236,7 +236,7 @@ func TestServer_GetValidators(t *testing.T) {
).Return(tt.chainResp, nil)
}
s := &Server{
beaconChainClient: beaconChainClient,
chainClient: beaconChainClient,
}
req := httptest.NewRequest(http.MethodGet, fmt.Sprintf("/v2/validator/beacon/validators?%s", tt.query), http.NoBody)
wr := httptest.NewRecorder()

View File

@@ -18,7 +18,7 @@ func (s *Server) GetVersion(w http.ResponseWriter, r *http.Request) {
ctx, span := trace.StartSpan(r.Context(), "validator.web.health.GetVersion")
defer span.End()
beacon, err := s.beaconNodeClient.GetVersion(ctx, &emptypb.Empty{})
beacon, err := s.nodeClient.GetVersion(ctx, &emptypb.Empty{})
if err != nil {
httputil.HandleError(w, err.Error(), http.StatusInternalServerError)
return
@@ -51,7 +51,7 @@ func (s *Server) StreamBeaconLogs(w http.ResponseWriter, r *http.Request) {
return
}
// TODO: StreamBeaconLogs grpc will need to be replaced in the future
client, err := s.beaconNodeHealthClient.StreamBeaconLogs(ctx, &emptypb.Empty{})
client, err := s.healthClient.StreamBeaconLogs(ctx, &emptypb.Empty{})
if err != nil {
httputil.HandleError(w, err.Error(), http.StatusInternalServerError)
return
@@ -102,8 +102,8 @@ func (s *Server) StreamValidatorLogs(w http.ResponseWriter, r *http.Request) {
return
}
ch := make(chan []byte, s.streamLogsBufferSize)
sub := s.logsStreamer.LogsFeed().Subscribe(ch)
ch := make(chan []byte, s.logStreamerBufferSize)
sub := s.logStreamer.LogsFeed().Subscribe(ch)
defer func() {
sub.Unsubscribe()
close(ch)
@@ -113,7 +113,7 @@ func (s *Server) StreamValidatorLogs(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", api.KeepAlive)
recentLogs := s.logsStreamer.GetLastFewLogs()
recentLogs := s.logStreamer.GetLastFewLogs()
logStrings := make([]string, len(recentLogs))
for i, l := range recentLogs {
logStrings[i] = string(l)

View File

@@ -73,8 +73,8 @@ func TestStreamBeaconLogs(t *testing.T) {
// Setting up the mock in the server struct
s := Server{
ctx: context.Background(),
beaconNodeHealthClient: mockClient,
ctx: context.Background(),
healthClient: mockClient,
}
// Create a mock ResponseWriter and Request
@@ -119,9 +119,9 @@ func TestStreamValidatorLogs(t *testing.T) {
logStreamer := mock.NewMockStreamer(mockLogs)
// Setting up the mock in the server struct
s := Server{
ctx: ctx,
logsStreamer: logStreamer,
streamLogsBufferSize: 100,
ctx: ctx,
logStreamer: logStreamer,
logStreamerBufferSize: 100,
}
w := &flushableResponseRecorder{
@@ -170,8 +170,8 @@ func TestServer_GetVersion(t *testing.T) {
ctx := context.Background()
mockNodeClient := validatormock.NewMockNodeClient(ctrl)
s := Server{
ctx: ctx,
beaconNodeClient: mockNodeClient,
ctx: ctx,
nodeClient: mockNodeClient,
}
mockNodeClient.EXPECT().GetVersion(gomock.Any(), gomock.Any()).Return(&eth.Version{
Version: "4.10.1",

View File

@@ -133,7 +133,7 @@ func (s *Server) ImportKeystores(w http.ResponseWriter, r *http.Request) {
keystores[i] = k
}
if req.SlashingProtection != "" {
if s.valDB == nil || s.valDB.ImportStandardProtectionJSON(ctx, bytes.NewBufferString(req.SlashingProtection)) != nil {
if s.db == nil || s.db.ImportStandardProtectionJSON(ctx, bytes.NewBufferString(req.SlashingProtection)) != nil {
statuses := make([]*keymanager.KeyStatus, len(req.Keystores))
for i := 0; i < len(req.Keystores); i++ {
statuses[i] = &keymanager.KeyStatus{
@@ -285,11 +285,11 @@ func (s *Server) transformDeletedKeysStatuses(
// Gets a map of all public keys in the database, useful for O(1) lookups.
func (s *Server) publicKeysInDB(ctx context.Context) (map[[fieldparams.BLSPubkeyLength]byte]bool, error) {
pubKeysInDB := make(map[[fieldparams.BLSPubkeyLength]byte]bool)
attestedPublicKeys, err := s.valDB.AttestedPublicKeys(ctx)
attestedPublicKeys, err := s.db.AttestedPublicKeys(ctx)
if err != nil {
return nil, fmt.Errorf("could not get attested public keys from DB: %v", err)
}
proposedPublicKeys, err := s.valDB.ProposedPublicKeys(ctx)
proposedPublicKeys, err := s.db.ProposedPublicKeys(ctx)
if err != nil {
return nil, fmt.Errorf("could not get proposed public keys from DB: %v", err)
}
@@ -313,7 +313,7 @@ func (s *Server) slashingProtectionHistoryForDeletedKeys(
filteredKeys = append(filteredKeys, pk)
}
}
return slashingprotection.ExportStandardProtectionJSON(ctx, s.valDB, filteredKeys...)
return slashingprotection.ExportStandardProtectionJSON(ctx, s.db, filteredKeys...)
}
// SetVoluntaryExit creates a signed voluntary exit message and returns a VoluntaryExit object.
@@ -347,7 +347,7 @@ func (s *Server) SetVoluntaryExit(w http.ResponseWriter, r *http.Request) {
epoch := primitives.Epoch(e)
if rawEpoch == "" {
genesisResponse, err := s.beaconNodeClient.GetGenesis(ctx, &emptypb.Empty{})
genesisResponse, err := s.nodeClient.GetGenesis(ctx, &emptypb.Empty{})
if err != nil {
httputil.HandleError(w, errors.Wrap(err, "Failed to get genesis time").Error(), http.StatusInternalServerError)
return
@@ -414,7 +414,7 @@ func (s *Server) ListRemoteKeys(w http.ResponseWriter, r *http.Request) {
for i := 0; i < len(pubKeys); i++ {
keystoreResponse[i] = &RemoteKey{
Pubkey: hexutil.Encode(pubKeys[i][:]),
Url: s.validatorService.Web3SignerConfig.BaseEndpoint,
Url: s.validatorService.RemoteSignerConfig().BaseEndpoint,
Readonly: true,
}
}

View File

@@ -289,7 +289,7 @@ func TestServer_ImportKeystores(t *testing.T) {
})
}
require.NoError(t, err)
s.valDB = validatorDB
s.db = validatorDB
// Have to close it after import is done otherwise it complains db is not open.
defer func() {
@@ -413,7 +413,7 @@ func TestServer_DeleteKeystores(t *testing.T) {
})
}
require.NoError(t, err)
srv.valDB = validatorDB
srv.db = validatorDB
// Have to close it after import is done otherwise it complains db is not open.
defer func() {
@@ -589,7 +589,7 @@ func TestServer_DeleteKeystores_FailedSlashingProtectionExport(t *testing.T) {
require.NoError(t, err)
err = validatorDB.SaveGenesisValidatorsRoot(ctx, make([]byte, fieldparams.RootLength))
require.NoError(t, err)
srv.valDB = validatorDB
srv.db = validatorDB
// Have to close it after import is done otherwise it complains db is not open.
defer func() {
@@ -746,7 +746,7 @@ func TestServer_SetVoluntaryExit(t *testing.T) {
validatorService: vs,
beaconNodeValidatorClient: beaconClient,
wallet: w,
beaconNodeClient: mockNodeClient,
nodeClient: mockNodeClient,
walletInitialized: w != nil,
}
@@ -841,7 +841,7 @@ func TestServer_SetVoluntaryExit(t *testing.T) {
resp := &SetVoluntaryExitResponse{}
require.NoError(t, json.Unmarshal(w.Body.Bytes(), resp))
if tt.w.epoch == 0 {
genesisResponse, err := s.beaconNodeClient.GetGenesis(ctx, &emptypb.Empty{})
genesisResponse, err := s.nodeClient.GetGenesis(ctx, &emptypb.Empty{})
require.NoError(t, err)
tt.w.epoch, err = client.CurrentEpoch(genesisResponse.GenesisTime)
require.NoError(t, err)
@@ -1091,14 +1091,14 @@ func TestServer_SetGasLimit(t *testing.T) {
validatorDB := dbtest.SetupDB(t, [][fieldparams.BLSPubkeyLength]byte{}, isSlashingProtectionMinimal)
vs, err := client.NewValidatorService(ctx, &client.Config{
Validator: m,
ValDB: validatorDB,
DB: validatorDB,
})
require.NoError(t, err)
s := &Server{
validatorService: vs,
beaconNodeValidatorClient: beaconClient,
valDB: validatorDB,
db: validatorDB,
}
if tt.beaconReturn != nil {
@@ -1280,12 +1280,12 @@ func TestServer_DeleteGasLimit(t *testing.T) {
validatorDB := dbtest.SetupDB(t, [][fieldparams.BLSPubkeyLength]byte{}, isSlashingProtectionMinimal)
vs, err := client.NewValidatorService(ctx, &client.Config{
Validator: m,
ValDB: validatorDB,
DB: validatorDB,
})
require.NoError(t, err)
s := &Server{
validatorService: vs,
valDB: validatorDB,
db: validatorDB,
}
// Set up global default value for builder gas limit.
params.BeaconConfig().DefaultBuilderGasLimit = uint64(globalDefaultGasLimit)
@@ -1744,13 +1744,13 @@ func TestServer_FeeRecipientByPubkey(t *testing.T) {
// save a default here
vs, err := client.NewValidatorService(ctx, &client.Config{
Validator: m,
ValDB: validatorDB,
DB: validatorDB,
})
require.NoError(t, err)
s := &Server{
validatorService: vs,
beaconNodeValidatorClient: beaconClient,
valDB: validatorDB,
db: validatorDB,
}
request := &SetFeeRecipientByPubkeyRequest{
Ethaddress: tt.args,
@@ -1854,12 +1854,12 @@ func TestServer_DeleteFeeRecipientByPubkey(t *testing.T) {
validatorDB := dbtest.SetupDB(t, [][fieldparams.BLSPubkeyLength]byte{}, isSlashingProtectionMinimal)
vs, err := client.NewValidatorService(ctx, &client.Config{
Validator: m,
ValDB: validatorDB,
DB: validatorDB,
})
require.NoError(t, err)
s := &Server{
validatorService: vs,
valDB: validatorDB,
db: validatorDB,
}
req := httptest.NewRequest(http.MethodDelete, fmt.Sprintf("/eth/v1/validator/{pubkey}/feerecipient"), nil)
req = mux.SetURLVars(req, map[string]string{"pubkey": pubkey})

View File

@@ -24,12 +24,12 @@ func (s *Server) ExportSlashingProtection(w http.ResponseWriter, r *http.Request
ctx, span := trace.StartSpan(r.Context(), "validator.ExportSlashingProtection")
defer span.End()
if s.valDB == nil {
if s.db == nil {
httputil.HandleError(w, "could not find validator database", http.StatusInternalServerError)
return
}
eipJSON, err := slashing.ExportStandardProtectionJSON(ctx, s.valDB)
eipJSON, err := slashing.ExportStandardProtectionJSON(ctx, s.db)
if err != nil {
httputil.HandleError(w, errors.Wrap(err, "could not export slashing protection history").Error(), http.StatusInternalServerError)
return
@@ -54,7 +54,7 @@ func (s *Server) ImportSlashingProtection(w http.ResponseWriter, r *http.Request
ctx, span := trace.StartSpan(r.Context(), "validator.ImportSlashingProtection")
defer span.End()
if s.valDB == nil {
if s.db == nil {
httputil.HandleError(w, "could not find validator database", http.StatusInternalServerError)
return
}
@@ -76,7 +76,7 @@ func (s *Server) ImportSlashingProtection(w http.ResponseWriter, r *http.Request
}
enc := []byte(req.SlashingProtectionJson)
buf := bytes.NewBuffer(enc)
if err := s.valDB.ImportStandardProtectionJSON(ctx, buf); err != nil {
if err := s.db.ImportStandardProtectionJSON(ctx, buf); err != nil {
httputil.HandleError(w, errors.Wrap(err, "could not import slashing protection history").Error(), http.StatusInternalServerError)
return
}

View File

@@ -77,7 +77,7 @@ func TestImportSlashingProtection_Preconditions(t *testing.T) {
})
}
require.NoError(t, err)
s.valDB = validatorDB
s.db = validatorDB
// Have to close it after import is done otherwise it complains db is not open.
defer func() {
@@ -151,7 +151,7 @@ func TestExportSlashingProtection_Preconditions(t *testing.T) {
})
}
require.NoError(t, err)
s.valDB = validatorDB
s.db = validatorDB
// Have to close it after export is done otherwise it complains db is not open.
defer func() {
@@ -189,7 +189,7 @@ func TestImportExportSlashingProtection_RoundTrip(t *testing.T) {
PubKeys: pubKeys,
})
require.NoError(t, err)
s.valDB = validatorDB
s.db = validatorDB
// Have to close it after import is done otherwise it complains db is not open.
defer func() {

View File

@@ -23,122 +23,83 @@ import (
"github.com/prysmaticlabs/prysm/v5/validator/client"
iface "github.com/prysmaticlabs/prysm/v5/validator/client/iface"
"github.com/prysmaticlabs/prysm/v5/validator/db"
"github.com/sirupsen/logrus"
"go.opencensus.io/plugin/ocgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/reflection"
)
// Config options for the gRPC server.
type Config struct {
ValidatorGatewayHost string
ValidatorGatewayPort int
ValidatorMonitoringHost string
ValidatorMonitoringPort int
BeaconClientEndpoint string
ClientMaxCallRecvMsgSize int
ClientGrpcRetries uint
ClientGrpcRetryDelay time.Duration
ClientGrpcHeaders []string
ClientWithCert string
Host string
Port string
CertFlag string
KeyFlag string
ValDB db.Database
AuthTokenPath string
WalletDir string
ValidatorService *client.ValidatorService
SyncChecker client.SyncChecker
GenesisFetcher client.GenesisFetcher
WalletInitializedFeed *event.Feed
NodeGatewayEndpoint string
BeaconApiEndpoint string
BeaconApiTimeout time.Duration
Router *mux.Router
Wallet *wallet.Wallet
Host string
Port string
GRPCGatewayHost string
GRPCGatewayPort int
GRPCMaxCallRecvMsgSize int
GRPCRetries uint
GRPCRetryDelay time.Duration
GRPCHeaders []string
BeaconNodeGRPCEndpoint string
BeaconApiEndpoint string
BeaconApiTimeout time.Duration
BeaconNodeCert string
DB db.Database
Wallet *wallet.Wallet
WalletDir string
WalletInitializedFeed *event.Feed
ValidatorService *client.ValidatorService
Router *mux.Router
}
// Server defining a gRPC server for the remote signer API.
type Server struct {
logsStreamer logs.Streamer
streamLogsBufferSize int
beaconChainClient iface.BeaconChainClient
beaconNodeClient iface.NodeClient
beaconNodeValidatorClient iface.ValidatorClient
beaconNodeHealthClient ethpb.HealthClient
valDB db.Database
ctx context.Context
cancel context.CancelFunc
beaconClientEndpoint string
clientMaxCallRecvMsgSize int
clientGrpcRetries uint
clientGrpcRetryDelay time.Duration
clientGrpcHeaders []string
clientWithCert string
host string
port string
grpcGatewayHost string
grpcGatewayPort int
listener net.Listener
withCert string
withKey string
credentialError error
grpcMaxCallRecvMsgSize int
grpcRetries uint
grpcRetryDelay time.Duration
grpcHeaders []string
grpcServer *grpc.Server
beaconNodeValidatorClient iface.ValidatorClient
chainClient iface.ChainClient
nodeClient iface.NodeClient
healthClient ethpb.HealthClient
beaconNodeEndpoint string
beaconApiEndpoint string
beaconApiTimeout time.Duration
beaconNodeCert string
jwtSecret []byte
validatorService *client.ValidatorService
syncChecker client.SyncChecker
genesisFetcher client.GenesisFetcher
authTokenPath string
authToken string
db db.Database
walletDir string
wallet *wallet.Wallet
walletInitializedFeed *event.Feed
walletInitialized bool
nodeGatewayEndpoint string
validatorMonitoringHost string
validatorMonitoringPort int
validatorGatewayHost string
validatorGatewayPort int
beaconApiEndpoint string
beaconApiTimeout time.Duration
validatorService *client.ValidatorService
router *mux.Router
logStreamer logs.Streamer
logStreamerBufferSize int
}
// NewServer instantiates a new gRPC server.
func NewServer(ctx context.Context, cfg *Config) *Server {
ctx, cancel := context.WithCancel(ctx)
server := &Server{
ctx: ctx,
cancel: cancel,
logsStreamer: logs.NewStreamServer(),
streamLogsBufferSize: 1000, // Enough to handle most bursts of logs in the validator client.
host: cfg.Host,
port: cfg.Port,
withCert: cfg.CertFlag,
withKey: cfg.KeyFlag,
beaconClientEndpoint: cfg.BeaconClientEndpoint,
clientMaxCallRecvMsgSize: cfg.ClientMaxCallRecvMsgSize,
clientGrpcRetries: cfg.ClientGrpcRetries,
clientGrpcRetryDelay: cfg.ClientGrpcRetryDelay,
clientGrpcHeaders: cfg.ClientGrpcHeaders,
clientWithCert: cfg.ClientWithCert,
valDB: cfg.ValDB,
validatorService: cfg.ValidatorService,
syncChecker: cfg.SyncChecker,
genesisFetcher: cfg.GenesisFetcher,
authTokenPath: cfg.AuthTokenPath,
walletDir: cfg.WalletDir,
walletInitializedFeed: cfg.WalletInitializedFeed,
walletInitialized: cfg.Wallet != nil,
wallet: cfg.Wallet,
nodeGatewayEndpoint: cfg.NodeGatewayEndpoint,
validatorMonitoringHost: cfg.ValidatorMonitoringHost,
validatorMonitoringPort: cfg.ValidatorMonitoringPort,
validatorGatewayHost: cfg.ValidatorGatewayHost,
validatorGatewayPort: cfg.ValidatorGatewayPort,
beaconApiTimeout: cfg.BeaconApiTimeout,
beaconApiEndpoint: cfg.BeaconApiEndpoint,
router: cfg.Router,
ctx: ctx,
cancel: cancel,
host: cfg.Host,
port: cfg.Port,
grpcGatewayHost: cfg.GRPCGatewayHost,
grpcGatewayPort: cfg.GRPCGatewayPort,
grpcMaxCallRecvMsgSize: cfg.GRPCMaxCallRecvMsgSize,
grpcRetries: cfg.GRPCRetries,
grpcRetryDelay: cfg.GRPCRetryDelay,
grpcHeaders: cfg.GRPCHeaders,
}
if server.authTokenPath == "" && server.walletDir != "" {
@@ -149,7 +110,7 @@ func NewServer(ctx context.Context, cfg *Config) *Server {
if err := server.initializeAuthToken(); err != nil {
log.WithError(err).Error("Could not initialize web auth token")
}
validatorWebAddr := fmt.Sprintf("%s:%d", server.validatorGatewayHost, server.validatorGatewayPort)
validatorWebAddr := fmt.Sprintf("%s:%d", server.grpcGatewayHost, server.grpcGatewayPort)
logValidatorWebAuth(validatorWebAddr, server.authToken, server.authTokenPath)
go server.refreshAuthTokenFromFileChanges(server.ctx, server.authTokenPath)
}
@@ -184,17 +145,7 @@ func (s *Server) Start() {
)),
}
grpcprometheus.EnableHandlingTimeHistogram()
if s.withCert != "" && s.withKey != "" {
creds, err := credentials.NewServerTLSFromFile(s.withCert, s.withKey)
if err != nil {
log.WithError(err).Fatal("Could not load TLS keys")
}
opts = append(opts, grpc.Creds(creds))
log.WithFields(logrus.Fields{
"certPath": s.withCert,
"keyPath": s.withKey,
}).Info("Loaded TLS certificates")
}
s.grpcServer = grpc.NewServer(opts...)
// Register a gRPC client to the beacon node.
@@ -282,7 +233,7 @@ func (s *Server) Stop() error {
return nil
}
// Status returns nil or credentialError.
// Status returns an error if the service is unhealthy.
func (s *Server) Status() error {
return s.credentialError
return nil
}