mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-10 13:58:09 -05:00
Compare commits
9 Commits
process-ex
...
gRPC-fallb
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9f828bdd88 | ||
|
|
17413b52ed | ||
|
|
a651e7f0ac | ||
|
|
3e1cb45e92 | ||
|
|
fc2dcb0e88 | ||
|
|
888db581dd | ||
|
|
f1d2ee72e2 | ||
|
|
31f18b9f60 | ||
|
|
6462c997e9 |
@@ -10,18 +10,16 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func NewChainClient(validatorConn validatorHelpers.NodeConnection, jsonRestHandler beaconApi.RestHandler) iface.ChainClient {
|
func NewChainClient(validatorConn validatorHelpers.NodeConnection, jsonRestHandler beaconApi.RestHandler) iface.ChainClient {
|
||||||
grpcClient := grpcApi.NewGrpcChainClient(validatorConn.GetGrpcClientConn())
|
grpcClient := grpcApi.NewGrpcChainClientWithConnection(validatorConn)
|
||||||
if features.Get().EnableBeaconRESTApi {
|
if features.Get().EnableBeaconRESTApi {
|
||||||
return beaconApi.NewBeaconApiChainClientWithFallback(jsonRestHandler, grpcClient)
|
return beaconApi.NewBeaconApiChainClientWithFallback(jsonRestHandler, grpcClient)
|
||||||
} else {
|
|
||||||
return grpcClient
|
|
||||||
}
|
}
|
||||||
|
return grpcClient
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPrysmChainClient(validatorConn validatorHelpers.NodeConnection, jsonRestHandler beaconApi.RestHandler) iface.PrysmChainClient {
|
func NewPrysmChainClient(validatorConn validatorHelpers.NodeConnection, jsonRestHandler beaconApi.RestHandler) iface.PrysmChainClient {
|
||||||
if features.Get().EnableBeaconRESTApi {
|
if features.Get().EnableBeaconRESTApi {
|
||||||
return beaconApi.NewPrysmChainClient(jsonRestHandler, nodeClientFactory.NewNodeClient(validatorConn, jsonRestHandler))
|
return beaconApi.NewPrysmChainClient(jsonRestHandler, nodeClientFactory.NewNodeClient(validatorConn, jsonRestHandler))
|
||||||
} else {
|
|
||||||
return grpcApi.NewGrpcPrysmChainClient(validatorConn.GetGrpcClientConn())
|
|
||||||
}
|
}
|
||||||
|
return grpcApi.NewGrpcPrysmChainClientWithConnection(validatorConn)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ go_library(
|
|||||||
name = "go_default_library",
|
name = "go_default_library",
|
||||||
srcs = [
|
srcs = [
|
||||||
"grpc_beacon_chain_client.go",
|
"grpc_beacon_chain_client.go",
|
||||||
|
"grpc_client_manager.go",
|
||||||
"grpc_node_client.go",
|
"grpc_node_client.go",
|
||||||
"grpc_prysm_beacon_chain_client.go",
|
"grpc_prysm_beacon_chain_client.go",
|
||||||
"grpc_validator_client.go",
|
"grpc_validator_client.go",
|
||||||
@@ -25,6 +26,7 @@ go_library(
|
|||||||
"//proto/eth/v1:go_default_library",
|
"//proto/eth/v1:go_default_library",
|
||||||
"//proto/prysm/v1alpha1:go_default_library",
|
"//proto/prysm/v1alpha1:go_default_library",
|
||||||
"//validator/client/iface:go_default_library",
|
"//validator/client/iface:go_default_library",
|
||||||
|
"//validator/helpers:go_default_library",
|
||||||
"@com_github_ethereum_go_ethereum//common/hexutil:go_default_library",
|
"@com_github_ethereum_go_ethereum//common/hexutil:go_default_library",
|
||||||
"@com_github_golang_protobuf//ptypes/empty",
|
"@com_github_golang_protobuf//ptypes/empty",
|
||||||
"@com_github_pkg_errors//:go_default_library",
|
"@com_github_pkg_errors//:go_default_library",
|
||||||
|
|||||||
@@ -5,38 +5,42 @@ import (
|
|||||||
|
|
||||||
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
|
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
|
||||||
"github.com/OffchainLabs/prysm/v7/validator/client/iface"
|
"github.com/OffchainLabs/prysm/v7/validator/client/iface"
|
||||||
|
validatorHelpers "github.com/OffchainLabs/prysm/v7/validator/helpers"
|
||||||
"github.com/golang/protobuf/ptypes/empty"
|
"github.com/golang/protobuf/ptypes/empty"
|
||||||
"google.golang.org/grpc"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type grpcChainClient struct {
|
type grpcChainClient struct {
|
||||||
beaconChainClient ethpb.BeaconChainClient
|
*grpcClientManager[ethpb.BeaconChainClient]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *grpcChainClient) ChainHead(ctx context.Context, in *empty.Empty) (*ethpb.ChainHead, error) {
|
func (c *grpcChainClient) ChainHead(ctx context.Context, in *empty.Empty) (*ethpb.ChainHead, error) {
|
||||||
return c.beaconChainClient.GetChainHead(ctx, in)
|
return c.getClient().GetChainHead(ctx, in)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *grpcChainClient) ValidatorBalances(ctx context.Context, in *ethpb.ListValidatorBalancesRequest) (*ethpb.ValidatorBalances, error) {
|
func (c *grpcChainClient) ValidatorBalances(ctx context.Context, in *ethpb.ListValidatorBalancesRequest) (*ethpb.ValidatorBalances, error) {
|
||||||
return c.beaconChainClient.ListValidatorBalances(ctx, in)
|
return c.getClient().ListValidatorBalances(ctx, in)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *grpcChainClient) Validators(ctx context.Context, in *ethpb.ListValidatorsRequest) (*ethpb.Validators, error) {
|
func (c *grpcChainClient) Validators(ctx context.Context, in *ethpb.ListValidatorsRequest) (*ethpb.Validators, error) {
|
||||||
return c.beaconChainClient.ListValidators(ctx, in)
|
return c.getClient().ListValidators(ctx, in)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *grpcChainClient) ValidatorQueue(ctx context.Context, in *empty.Empty) (*ethpb.ValidatorQueue, error) {
|
func (c *grpcChainClient) ValidatorQueue(ctx context.Context, in *empty.Empty) (*ethpb.ValidatorQueue, error) {
|
||||||
return c.beaconChainClient.GetValidatorQueue(ctx, in)
|
return c.getClient().GetValidatorQueue(ctx, in)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *grpcChainClient) ValidatorPerformance(ctx context.Context, in *ethpb.ValidatorPerformanceRequest) (*ethpb.ValidatorPerformanceResponse, error) {
|
func (c *grpcChainClient) ValidatorPerformance(ctx context.Context, in *ethpb.ValidatorPerformanceRequest) (*ethpb.ValidatorPerformanceResponse, error) {
|
||||||
return c.beaconChainClient.GetValidatorPerformance(ctx, in)
|
return c.getClient().GetValidatorPerformance(ctx, in)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *grpcChainClient) ValidatorParticipation(ctx context.Context, in *ethpb.GetValidatorParticipationRequest) (*ethpb.ValidatorParticipationResponse, error) {
|
func (c *grpcChainClient) ValidatorParticipation(ctx context.Context, in *ethpb.GetValidatorParticipationRequest) (*ethpb.ValidatorParticipationResponse, error) {
|
||||||
return c.beaconChainClient.GetValidatorParticipation(ctx, in)
|
return c.getClient().GetValidatorParticipation(ctx, in)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewGrpcChainClient(cc grpc.ClientConnInterface) iface.ChainClient {
|
// NewGrpcChainClientWithConnection creates a new gRPC chain client that supports
|
||||||
return &grpcChainClient{ethpb.NewBeaconChainClient(cc)}
|
// dynamic connection switching via the NodeConnection's GrpcConnectionProvider.
|
||||||
|
func NewGrpcChainClientWithConnection(conn validatorHelpers.NodeConnection) iface.ChainClient {
|
||||||
|
return &grpcChainClient{
|
||||||
|
grpcClientManager: newGrpcClientManager(conn, ethpb.NewBeaconChainClient),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
61
validator/client/grpc-api/grpc_client_manager.go
Normal file
61
validator/client/grpc-api/grpc_client_manager.go
Normal file
@@ -0,0 +1,61 @@
|
|||||||
|
package grpc_api
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
validatorHelpers "github.com/OffchainLabs/prysm/v7/validator/helpers"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
)
|
||||||
|
|
||||||
|
// grpcClientManager handles dynamic gRPC client recreation when the connection changes.
|
||||||
|
// It uses generics to work with any gRPC client type.
|
||||||
|
type grpcClientManager[T any] struct {
|
||||||
|
conn validatorHelpers.NodeConnection
|
||||||
|
client T
|
||||||
|
lastHost string
|
||||||
|
clientMu sync.RWMutex
|
||||||
|
newClient func(grpc.ClientConnInterface) T
|
||||||
|
}
|
||||||
|
|
||||||
|
// newGrpcClientManager creates a new client manager with the given connection and client constructor.
|
||||||
|
func newGrpcClientManager[T any](
|
||||||
|
conn validatorHelpers.NodeConnection,
|
||||||
|
newClient func(grpc.ClientConnInterface) T,
|
||||||
|
) *grpcClientManager[T] {
|
||||||
|
m := &grpcClientManager[T]{
|
||||||
|
conn: conn,
|
||||||
|
newClient: newClient,
|
||||||
|
client: newClient(conn.GetGrpcClientConn()),
|
||||||
|
}
|
||||||
|
if provider := conn.GetGrpcConnectionProvider(); provider != nil {
|
||||||
|
m.lastHost = provider.CurrentHost()
|
||||||
|
}
|
||||||
|
return m
|
||||||
|
}
|
||||||
|
|
||||||
|
// getClient returns the current client, recreating it if the connection has changed.
|
||||||
|
func (m *grpcClientManager[T]) getClient() T {
|
||||||
|
if m.conn == nil || m.conn.GetGrpcConnectionProvider() == nil {
|
||||||
|
return m.client
|
||||||
|
}
|
||||||
|
|
||||||
|
currentHost := m.conn.GetGrpcConnectionProvider().CurrentHost()
|
||||||
|
m.clientMu.RLock()
|
||||||
|
if m.lastHost == currentHost {
|
||||||
|
client := m.client
|
||||||
|
m.clientMu.RUnlock()
|
||||||
|
return client
|
||||||
|
}
|
||||||
|
m.clientMu.RUnlock()
|
||||||
|
|
||||||
|
// Connection changed, need to recreate client
|
||||||
|
m.clientMu.Lock()
|
||||||
|
defer m.clientMu.Unlock()
|
||||||
|
// Double-check after acquiring write lock
|
||||||
|
if m.lastHost == currentHost {
|
||||||
|
return m.client
|
||||||
|
}
|
||||||
|
m.client = m.newClient(m.conn.GetGrpcClientConn())
|
||||||
|
m.lastHost = currentHost
|
||||||
|
return m.client
|
||||||
|
}
|
||||||
@@ -5,8 +5,8 @@ import (
|
|||||||
|
|
||||||
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
|
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
|
||||||
"github.com/OffchainLabs/prysm/v7/validator/client/iface"
|
"github.com/OffchainLabs/prysm/v7/validator/client/iface"
|
||||||
|
validatorHelpers "github.com/OffchainLabs/prysm/v7/validator/helpers"
|
||||||
"github.com/golang/protobuf/ptypes/empty"
|
"github.com/golang/protobuf/ptypes/empty"
|
||||||
"google.golang.org/grpc"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@@ -14,35 +14,48 @@ var (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type grpcNodeClient struct {
|
type grpcNodeClient struct {
|
||||||
nodeClient ethpb.NodeClient
|
*grpcClientManager[ethpb.NodeClient]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *grpcNodeClient) SyncStatus(ctx context.Context, in *empty.Empty) (*ethpb.SyncStatus, error) {
|
func (c *grpcNodeClient) SyncStatus(ctx context.Context, in *empty.Empty) (*ethpb.SyncStatus, error) {
|
||||||
return c.nodeClient.GetSyncStatus(ctx, in)
|
return c.getClient().GetSyncStatus(ctx, in)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *grpcNodeClient) Genesis(ctx context.Context, in *empty.Empty) (*ethpb.Genesis, error) {
|
func (c *grpcNodeClient) Genesis(ctx context.Context, in *empty.Empty) (*ethpb.Genesis, error) {
|
||||||
return c.nodeClient.GetGenesis(ctx, in)
|
return c.getClient().GetGenesis(ctx, in)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *grpcNodeClient) Version(ctx context.Context, in *empty.Empty) (*ethpb.Version, error) {
|
func (c *grpcNodeClient) Version(ctx context.Context, in *empty.Empty) (*ethpb.Version, error) {
|
||||||
return c.nodeClient.GetVersion(ctx, in)
|
return c.getClient().GetVersion(ctx, in)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *grpcNodeClient) Peers(ctx context.Context, in *empty.Empty) (*ethpb.Peers, error) {
|
func (c *grpcNodeClient) Peers(ctx context.Context, in *empty.Empty) (*ethpb.Peers, error) {
|
||||||
return c.nodeClient.ListPeers(ctx, in)
|
return c.getClient().ListPeers(ctx, in)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *grpcNodeClient) IsReady(ctx context.Context) bool {
|
func (c *grpcNodeClient) IsReady(ctx context.Context) bool {
|
||||||
_, err := c.nodeClient.GetHealth(ctx, ðpb.HealthRequest{})
|
_, err := c.nodeClient.GetHealth(ctx, ðpb.HealthRequest{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Error("Failed to get health of node")
|
log.WithError(err).Debug("Failed to get health of node")
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
// Then check sync status - we only want fully synced nodes
|
||||||
|
syncStatus, err := c.getClient().GetSyncStatus(ctx, &empty.Empty{})
|
||||||
|
if err != nil {
|
||||||
|
log.WithError(err).Debug("Failed to get sync status of node")
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if syncStatus.Syncing {
|
||||||
|
log.Debug("Node is syncing, not fully synced")
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewNodeClient(cc grpc.ClientConnInterface) iface.NodeClient {
|
// NewNodeClientWithConnection creates a new gRPC node client that supports
|
||||||
g := &grpcNodeClient{nodeClient: ethpb.NewNodeClient(cc)}
|
// dynamic connection switching via the NodeConnection's GrpcConnectionProvider.
|
||||||
return g
|
func NewNodeClientWithConnection(conn validatorHelpers.NodeConnection) iface.NodeClient {
|
||||||
|
return &grpcNodeClient{
|
||||||
|
grpcClientManager: newGrpcClientManager(conn, ethpb.NewNodeClient),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -12,9 +12,9 @@ import (
|
|||||||
eth "github.com/OffchainLabs/prysm/v7/proto/eth/v1"
|
eth "github.com/OffchainLabs/prysm/v7/proto/eth/v1"
|
||||||
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
|
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
|
||||||
"github.com/OffchainLabs/prysm/v7/validator/client/iface"
|
"github.com/OffchainLabs/prysm/v7/validator/client/iface"
|
||||||
|
validatorHelpers "github.com/OffchainLabs/prysm/v7/validator/helpers"
|
||||||
"github.com/golang/protobuf/ptypes/empty"
|
"github.com/golang/protobuf/ptypes/empty"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"google.golang.org/grpc"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type grpcPrysmChainClient struct {
|
type grpcPrysmChainClient struct {
|
||||||
@@ -95,6 +95,8 @@ func (c *grpcPrysmChainClient) ValidatorPerformance(ctx context.Context, in *eth
|
|||||||
return c.chainClient.ValidatorPerformance(ctx, in)
|
return c.chainClient.ValidatorPerformance(ctx, in)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewGrpcPrysmChainClient(cc grpc.ClientConnInterface) iface.PrysmChainClient {
|
// NewGrpcPrysmChainClientWithConnection creates a new gRPC Prysm chain client that supports
|
||||||
return &grpcPrysmChainClient{chainClient: &grpcChainClient{ethpb.NewBeaconChainClient(cc)}}
|
// dynamic connection switching via the NodeConnection's GrpcConnectionProvider.
|
||||||
|
func NewGrpcPrysmChainClientWithConnection(conn validatorHelpers.NodeConnection) iface.PrysmChainClient {
|
||||||
|
return &grpcPrysmChainClient{chainClient: NewGrpcChainClientWithConnection(conn)}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -14,24 +14,24 @@ import (
|
|||||||
"github.com/OffchainLabs/prysm/v7/monitoring/tracing/trace"
|
"github.com/OffchainLabs/prysm/v7/monitoring/tracing/trace"
|
||||||
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
|
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
|
||||||
"github.com/OffchainLabs/prysm/v7/validator/client/iface"
|
"github.com/OffchainLabs/prysm/v7/validator/client/iface"
|
||||||
|
validatorHelpers "github.com/OffchainLabs/prysm/v7/validator/helpers"
|
||||||
"github.com/ethereum/go-ethereum/common/hexutil"
|
"github.com/ethereum/go-ethereum/common/hexutil"
|
||||||
"github.com/golang/protobuf/ptypes/empty"
|
"github.com/golang/protobuf/ptypes/empty"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"google.golang.org/grpc"
|
|
||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
"google.golang.org/grpc/status"
|
"google.golang.org/grpc/status"
|
||||||
)
|
)
|
||||||
|
|
||||||
type grpcValidatorClient struct {
|
type grpcValidatorClient struct {
|
||||||
beaconNodeValidatorClient ethpb.BeaconNodeValidatorClient
|
*grpcClientManager[ethpb.BeaconNodeValidatorClient]
|
||||||
isEventStreamRunning bool
|
isEventStreamRunning bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *grpcValidatorClient) Duties(ctx context.Context, in *ethpb.DutiesRequest) (*ethpb.ValidatorDutiesContainer, error) {
|
func (c *grpcValidatorClient) Duties(ctx context.Context, in *ethpb.DutiesRequest) (*ethpb.ValidatorDutiesContainer, error) {
|
||||||
if features.Get().DisableDutiesV2 {
|
if features.Get().DisableDutiesV2 {
|
||||||
return c.getDuties(ctx, in)
|
return c.getDuties(ctx, in)
|
||||||
}
|
}
|
||||||
dutiesResponse, err := c.beaconNodeValidatorClient.GetDutiesV2(ctx, in)
|
dutiesResponse, err := c.getClient().GetDutiesV2(ctx, in)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if status.Code(err) == codes.Unimplemented {
|
if status.Code(err) == codes.Unimplemented {
|
||||||
log.Warn("GetDutiesV2 returned status code unavailable, falling back to GetDuties")
|
log.Warn("GetDutiesV2 returned status code unavailable, falling back to GetDuties")
|
||||||
@@ -47,7 +47,7 @@ func (c *grpcValidatorClient) Duties(ctx context.Context, in *ethpb.DutiesReques
|
|||||||
|
|
||||||
// getDuties is calling the v1 of get duties
|
// getDuties is calling the v1 of get duties
|
||||||
func (c *grpcValidatorClient) getDuties(ctx context.Context, in *ethpb.DutiesRequest) (*ethpb.ValidatorDutiesContainer, error) {
|
func (c *grpcValidatorClient) getDuties(ctx context.Context, in *ethpb.DutiesRequest) (*ethpb.ValidatorDutiesContainer, error) {
|
||||||
dutiesResponse, err := c.beaconNodeValidatorClient.GetDuties(ctx, in)
|
dutiesResponse, err := c.getClient().GetDuties(ctx, in)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(
|
return nil, errors.Wrap(
|
||||||
client.ErrConnectionIssue,
|
client.ErrConnectionIssue,
|
||||||
@@ -147,108 +147,108 @@ func toValidatorDutyV2(duty *ethpb.DutiesV2Response_Duty) (*ethpb.ValidatorDuty,
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *grpcValidatorClient) CheckDoppelGanger(ctx context.Context, in *ethpb.DoppelGangerRequest) (*ethpb.DoppelGangerResponse, error) {
|
func (c *grpcValidatorClient) CheckDoppelGanger(ctx context.Context, in *ethpb.DoppelGangerRequest) (*ethpb.DoppelGangerResponse, error) {
|
||||||
return c.beaconNodeValidatorClient.CheckDoppelGanger(ctx, in)
|
return c.getClient().CheckDoppelGanger(ctx, in)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *grpcValidatorClient) DomainData(ctx context.Context, in *ethpb.DomainRequest) (*ethpb.DomainResponse, error) {
|
func (c *grpcValidatorClient) DomainData(ctx context.Context, in *ethpb.DomainRequest) (*ethpb.DomainResponse, error) {
|
||||||
return c.beaconNodeValidatorClient.DomainData(ctx, in)
|
return c.getClient().DomainData(ctx, in)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *grpcValidatorClient) AttestationData(ctx context.Context, in *ethpb.AttestationDataRequest) (*ethpb.AttestationData, error) {
|
func (c *grpcValidatorClient) AttestationData(ctx context.Context, in *ethpb.AttestationDataRequest) (*ethpb.AttestationData, error) {
|
||||||
return c.beaconNodeValidatorClient.GetAttestationData(ctx, in)
|
return c.getClient().GetAttestationData(ctx, in)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *grpcValidatorClient) BeaconBlock(ctx context.Context, in *ethpb.BlockRequest) (*ethpb.GenericBeaconBlock, error) {
|
func (c *grpcValidatorClient) BeaconBlock(ctx context.Context, in *ethpb.BlockRequest) (*ethpb.GenericBeaconBlock, error) {
|
||||||
return c.beaconNodeValidatorClient.GetBeaconBlock(ctx, in)
|
return c.getClient().GetBeaconBlock(ctx, in)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *grpcValidatorClient) FeeRecipientByPubKey(ctx context.Context, in *ethpb.FeeRecipientByPubKeyRequest) (*ethpb.FeeRecipientByPubKeyResponse, error) {
|
func (c *grpcValidatorClient) FeeRecipientByPubKey(ctx context.Context, in *ethpb.FeeRecipientByPubKeyRequest) (*ethpb.FeeRecipientByPubKeyResponse, error) {
|
||||||
return c.beaconNodeValidatorClient.GetFeeRecipientByPubKey(ctx, in)
|
return c.getClient().GetFeeRecipientByPubKey(ctx, in)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *grpcValidatorClient) SyncCommitteeContribution(ctx context.Context, in *ethpb.SyncCommitteeContributionRequest) (*ethpb.SyncCommitteeContribution, error) {
|
func (c *grpcValidatorClient) SyncCommitteeContribution(ctx context.Context, in *ethpb.SyncCommitteeContributionRequest) (*ethpb.SyncCommitteeContribution, error) {
|
||||||
return c.beaconNodeValidatorClient.GetSyncCommitteeContribution(ctx, in)
|
return c.getClient().GetSyncCommitteeContribution(ctx, in)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *grpcValidatorClient) SyncMessageBlockRoot(ctx context.Context, in *empty.Empty) (*ethpb.SyncMessageBlockRootResponse, error) {
|
func (c *grpcValidatorClient) SyncMessageBlockRoot(ctx context.Context, in *empty.Empty) (*ethpb.SyncMessageBlockRootResponse, error) {
|
||||||
return c.beaconNodeValidatorClient.GetSyncMessageBlockRoot(ctx, in)
|
return c.getClient().GetSyncMessageBlockRoot(ctx, in)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *grpcValidatorClient) SyncSubcommitteeIndex(ctx context.Context, in *ethpb.SyncSubcommitteeIndexRequest) (*ethpb.SyncSubcommitteeIndexResponse, error) {
|
func (c *grpcValidatorClient) SyncSubcommitteeIndex(ctx context.Context, in *ethpb.SyncSubcommitteeIndexRequest) (*ethpb.SyncSubcommitteeIndexResponse, error) {
|
||||||
return c.beaconNodeValidatorClient.GetSyncSubcommitteeIndex(ctx, in)
|
return c.getClient().GetSyncSubcommitteeIndex(ctx, in)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *grpcValidatorClient) MultipleValidatorStatus(ctx context.Context, in *ethpb.MultipleValidatorStatusRequest) (*ethpb.MultipleValidatorStatusResponse, error) {
|
func (c *grpcValidatorClient) MultipleValidatorStatus(ctx context.Context, in *ethpb.MultipleValidatorStatusRequest) (*ethpb.MultipleValidatorStatusResponse, error) {
|
||||||
return c.beaconNodeValidatorClient.MultipleValidatorStatus(ctx, in)
|
return c.getClient().MultipleValidatorStatus(ctx, in)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *grpcValidatorClient) PrepareBeaconProposer(ctx context.Context, in *ethpb.PrepareBeaconProposerRequest) (*empty.Empty, error) {
|
func (c *grpcValidatorClient) PrepareBeaconProposer(ctx context.Context, in *ethpb.PrepareBeaconProposerRequest) (*empty.Empty, error) {
|
||||||
return c.beaconNodeValidatorClient.PrepareBeaconProposer(ctx, in)
|
return c.getClient().PrepareBeaconProposer(ctx, in)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *grpcValidatorClient) ProposeAttestation(ctx context.Context, in *ethpb.Attestation) (*ethpb.AttestResponse, error) {
|
func (c *grpcValidatorClient) ProposeAttestation(ctx context.Context, in *ethpb.Attestation) (*ethpb.AttestResponse, error) {
|
||||||
return c.beaconNodeValidatorClient.ProposeAttestation(ctx, in)
|
return c.getClient().ProposeAttestation(ctx, in)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *grpcValidatorClient) ProposeAttestationElectra(ctx context.Context, in *ethpb.SingleAttestation) (*ethpb.AttestResponse, error) {
|
func (c *grpcValidatorClient) ProposeAttestationElectra(ctx context.Context, in *ethpb.SingleAttestation) (*ethpb.AttestResponse, error) {
|
||||||
return c.beaconNodeValidatorClient.ProposeAttestationElectra(ctx, in)
|
return c.getClient().ProposeAttestationElectra(ctx, in)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *grpcValidatorClient) ProposeBeaconBlock(ctx context.Context, in *ethpb.GenericSignedBeaconBlock) (*ethpb.ProposeResponse, error) {
|
func (c *grpcValidatorClient) ProposeBeaconBlock(ctx context.Context, in *ethpb.GenericSignedBeaconBlock) (*ethpb.ProposeResponse, error) {
|
||||||
return c.beaconNodeValidatorClient.ProposeBeaconBlock(ctx, in)
|
return c.getClient().ProposeBeaconBlock(ctx, in)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *grpcValidatorClient) ProposeExit(ctx context.Context, in *ethpb.SignedVoluntaryExit) (*ethpb.ProposeExitResponse, error) {
|
func (c *grpcValidatorClient) ProposeExit(ctx context.Context, in *ethpb.SignedVoluntaryExit) (*ethpb.ProposeExitResponse, error) {
|
||||||
return c.beaconNodeValidatorClient.ProposeExit(ctx, in)
|
return c.getClient().ProposeExit(ctx, in)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *grpcValidatorClient) StreamBlocksAltair(ctx context.Context, in *ethpb.StreamBlocksRequest) (ethpb.BeaconNodeValidator_StreamBlocksAltairClient, error) {
|
func (c *grpcValidatorClient) StreamBlocksAltair(ctx context.Context, in *ethpb.StreamBlocksRequest) (ethpb.BeaconNodeValidator_StreamBlocksAltairClient, error) {
|
||||||
return c.beaconNodeValidatorClient.StreamBlocksAltair(ctx, in)
|
return c.getClient().StreamBlocksAltair(ctx, in)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *grpcValidatorClient) SubmitAggregateSelectionProof(ctx context.Context, in *ethpb.AggregateSelectionRequest, _ primitives.ValidatorIndex, _ uint64) (*ethpb.AggregateSelectionResponse, error) {
|
func (c *grpcValidatorClient) SubmitAggregateSelectionProof(ctx context.Context, in *ethpb.AggregateSelectionRequest, _ primitives.ValidatorIndex, _ uint64) (*ethpb.AggregateSelectionResponse, error) {
|
||||||
return c.beaconNodeValidatorClient.SubmitAggregateSelectionProof(ctx, in)
|
return c.getClient().SubmitAggregateSelectionProof(ctx, in)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *grpcValidatorClient) SubmitAggregateSelectionProofElectra(ctx context.Context, in *ethpb.AggregateSelectionRequest, _ primitives.ValidatorIndex, _ uint64) (*ethpb.AggregateSelectionElectraResponse, error) {
|
func (c *grpcValidatorClient) SubmitAggregateSelectionProofElectra(ctx context.Context, in *ethpb.AggregateSelectionRequest, _ primitives.ValidatorIndex, _ uint64) (*ethpb.AggregateSelectionElectraResponse, error) {
|
||||||
return c.beaconNodeValidatorClient.SubmitAggregateSelectionProofElectra(ctx, in)
|
return c.getClient().SubmitAggregateSelectionProofElectra(ctx, in)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *grpcValidatorClient) SubmitSignedAggregateSelectionProof(ctx context.Context, in *ethpb.SignedAggregateSubmitRequest) (*ethpb.SignedAggregateSubmitResponse, error) {
|
func (c *grpcValidatorClient) SubmitSignedAggregateSelectionProof(ctx context.Context, in *ethpb.SignedAggregateSubmitRequest) (*ethpb.SignedAggregateSubmitResponse, error) {
|
||||||
return c.beaconNodeValidatorClient.SubmitSignedAggregateSelectionProof(ctx, in)
|
return c.getClient().SubmitSignedAggregateSelectionProof(ctx, in)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *grpcValidatorClient) SubmitSignedAggregateSelectionProofElectra(ctx context.Context, in *ethpb.SignedAggregateSubmitElectraRequest) (*ethpb.SignedAggregateSubmitResponse, error) {
|
func (c *grpcValidatorClient) SubmitSignedAggregateSelectionProofElectra(ctx context.Context, in *ethpb.SignedAggregateSubmitElectraRequest) (*ethpb.SignedAggregateSubmitResponse, error) {
|
||||||
return c.beaconNodeValidatorClient.SubmitSignedAggregateSelectionProofElectra(ctx, in)
|
return c.getClient().SubmitSignedAggregateSelectionProofElectra(ctx, in)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *grpcValidatorClient) SubmitSignedContributionAndProof(ctx context.Context, in *ethpb.SignedContributionAndProof) (*empty.Empty, error) {
|
func (c *grpcValidatorClient) SubmitSignedContributionAndProof(ctx context.Context, in *ethpb.SignedContributionAndProof) (*empty.Empty, error) {
|
||||||
return c.beaconNodeValidatorClient.SubmitSignedContributionAndProof(ctx, in)
|
return c.getClient().SubmitSignedContributionAndProof(ctx, in)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *grpcValidatorClient) SubmitSyncMessage(ctx context.Context, in *ethpb.SyncCommitteeMessage) (*empty.Empty, error) {
|
func (c *grpcValidatorClient) SubmitSyncMessage(ctx context.Context, in *ethpb.SyncCommitteeMessage) (*empty.Empty, error) {
|
||||||
return c.beaconNodeValidatorClient.SubmitSyncMessage(ctx, in)
|
return c.getClient().SubmitSyncMessage(ctx, in)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *grpcValidatorClient) SubmitValidatorRegistrations(ctx context.Context, in *ethpb.SignedValidatorRegistrationsV1) (*empty.Empty, error) {
|
func (c *grpcValidatorClient) SubmitValidatorRegistrations(ctx context.Context, in *ethpb.SignedValidatorRegistrationsV1) (*empty.Empty, error) {
|
||||||
return c.beaconNodeValidatorClient.SubmitValidatorRegistrations(ctx, in)
|
return c.getClient().SubmitValidatorRegistrations(ctx, in)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *grpcValidatorClient) SubscribeCommitteeSubnets(ctx context.Context, in *ethpb.CommitteeSubnetsSubscribeRequest, _ []*ethpb.ValidatorDuty) (*empty.Empty, error) {
|
func (c *grpcValidatorClient) SubscribeCommitteeSubnets(ctx context.Context, in *ethpb.CommitteeSubnetsSubscribeRequest, _ []*ethpb.ValidatorDuty) (*empty.Empty, error) {
|
||||||
return c.beaconNodeValidatorClient.SubscribeCommitteeSubnets(ctx, in)
|
return c.getClient().SubscribeCommitteeSubnets(ctx, in)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *grpcValidatorClient) ValidatorIndex(ctx context.Context, in *ethpb.ValidatorIndexRequest) (*ethpb.ValidatorIndexResponse, error) {
|
func (c *grpcValidatorClient) ValidatorIndex(ctx context.Context, in *ethpb.ValidatorIndexRequest) (*ethpb.ValidatorIndexResponse, error) {
|
||||||
return c.beaconNodeValidatorClient.ValidatorIndex(ctx, in)
|
return c.getClient().ValidatorIndex(ctx, in)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *grpcValidatorClient) ValidatorStatus(ctx context.Context, in *ethpb.ValidatorStatusRequest) (*ethpb.ValidatorStatusResponse, error) {
|
func (c *grpcValidatorClient) ValidatorStatus(ctx context.Context, in *ethpb.ValidatorStatusRequest) (*ethpb.ValidatorStatusResponse, error) {
|
||||||
return c.beaconNodeValidatorClient.ValidatorStatus(ctx, in)
|
return c.getClient().ValidatorStatus(ctx, in)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Deprecated: Do not use.
|
// Deprecated: Do not use.
|
||||||
func (c *grpcValidatorClient) WaitForChainStart(ctx context.Context, in *empty.Empty) (*ethpb.ChainStartResponse, error) {
|
func (c *grpcValidatorClient) WaitForChainStart(ctx context.Context, in *empty.Empty) (*ethpb.ChainStartResponse, error) {
|
||||||
stream, err := c.beaconNodeValidatorClient.WaitForChainStart(ctx, in)
|
stream, err := c.getClient().WaitForChainStart(ctx, in)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(
|
return nil, errors.Wrap(
|
||||||
client.ErrConnectionIssue,
|
client.ErrConnectionIssue,
|
||||||
@@ -260,13 +260,13 @@ func (c *grpcValidatorClient) WaitForChainStart(ctx context.Context, in *empty.E
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *grpcValidatorClient) AssignValidatorToSubnet(ctx context.Context, in *ethpb.AssignValidatorToSubnetRequest) (*empty.Empty, error) {
|
func (c *grpcValidatorClient) AssignValidatorToSubnet(ctx context.Context, in *ethpb.AssignValidatorToSubnetRequest) (*empty.Empty, error) {
|
||||||
return c.beaconNodeValidatorClient.AssignValidatorToSubnet(ctx, in)
|
return c.getClient().AssignValidatorToSubnet(ctx, in)
|
||||||
}
|
}
|
||||||
func (c *grpcValidatorClient) AggregatedSigAndAggregationBits(
|
func (c *grpcValidatorClient) AggregatedSigAndAggregationBits(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
in *ethpb.AggregatedSigAndAggregationBitsRequest,
|
in *ethpb.AggregatedSigAndAggregationBitsRequest,
|
||||||
) (*ethpb.AggregatedSigAndAggregationBitsResponse, error) {
|
) (*ethpb.AggregatedSigAndAggregationBitsResponse, error) {
|
||||||
return c.beaconNodeValidatorClient.AggregatedSigAndAggregationBits(ctx, in)
|
return c.getClient().AggregatedSigAndAggregationBits(ctx, in)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (*grpcValidatorClient) AggregatedSelections(context.Context, []iface.BeaconCommitteeSelection) ([]iface.BeaconCommitteeSelection, error) {
|
func (*grpcValidatorClient) AggregatedSelections(context.Context, []iface.BeaconCommitteeSelection) ([]iface.BeaconCommitteeSelection, error) {
|
||||||
@@ -277,8 +277,12 @@ func (*grpcValidatorClient) AggregatedSyncSelections(context.Context, []iface.Sy
|
|||||||
return nil, iface.ErrNotSupported
|
return nil, iface.ErrNotSupported
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewGrpcValidatorClient(cc grpc.ClientConnInterface) iface.ValidatorClient {
|
// NewGrpcValidatorClientWithConnection creates a new gRPC validator client that supports
|
||||||
return &grpcValidatorClient{ethpb.NewBeaconNodeValidatorClient(cc), false}
|
// dynamic connection switching via the NodeConnection's GrpcConnectionProvider.
|
||||||
|
func NewGrpcValidatorClientWithConnection(conn validatorHelpers.NodeConnection) iface.ValidatorClient {
|
||||||
|
return &grpcValidatorClient{
|
||||||
|
grpcClientManager: newGrpcClientManager(conn, ethpb.NewBeaconNodeValidatorClient),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *grpcValidatorClient) StartEventStream(ctx context.Context, topics []string, eventsChannel chan<- *eventClient.Event) {
|
func (c *grpcValidatorClient) StartEventStream(ctx context.Context, topics []string, eventsChannel chan<- *eventClient.Event) {
|
||||||
@@ -308,7 +312,7 @@ func (c *grpcValidatorClient) StartEventStream(ctx context.Context, topics []str
|
|||||||
log.Warn("gRPC only supports the head topic, other topics will be ignored")
|
log.Warn("gRPC only supports the head topic, other topics will be ignored")
|
||||||
}
|
}
|
||||||
|
|
||||||
stream, err := c.beaconNodeValidatorClient.StreamSlots(ctx, ðpb.StreamSlotsRequest{VerifiedOnly: true})
|
stream, err := c.getClient().StreamSlots(ctx, ðpb.StreamSlotsRequest{VerifiedOnly: true})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
eventsChannel <- &eventClient.Event{
|
eventsChannel <- &eventClient.Event{
|
||||||
EventType: eventClient.EventConnectionError,
|
EventType: eventClient.EventConnectionError,
|
||||||
@@ -374,11 +378,29 @@ func (c *grpcValidatorClient) EventStreamIsRunning() bool {
|
|||||||
return c.isEventStreamRunning
|
return c.isEventStreamRunning
|
||||||
}
|
}
|
||||||
|
|
||||||
func (*grpcValidatorClient) Host() string {
|
func (c *grpcValidatorClient) Host() string {
|
||||||
log.Warn(iface.ErrNotSupported)
|
if c.grpcClientManager == nil || c.grpcClientManager.conn == nil || c.grpcClientManager.conn.GetGrpcConnectionProvider() == nil {
|
||||||
return ""
|
return ""
|
||||||
|
}
|
||||||
|
return c.grpcClientManager.conn.GetGrpcConnectionProvider().CurrentHost()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (*grpcValidatorClient) SetHost(_ string) {
|
func (c *grpcValidatorClient) SetHost(host string) {
|
||||||
log.Warn(iface.ErrNotSupported)
|
if c.grpcClientManager == nil || c.grpcClientManager.conn == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
provider := c.grpcClientManager.conn.GetGrpcConnectionProvider()
|
||||||
|
if provider == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Find the index of the requested host and switch to it
|
||||||
|
for i, h := range provider.Hosts() {
|
||||||
|
if h == host {
|
||||||
|
if err := provider.SetHost(i); err != nil {
|
||||||
|
log.WithError(err).WithField("host", host).Error("Failed to set gRPC host")
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
log.WithField("host", host).Warn("Requested gRPC host not found in configured endpoints")
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -133,7 +133,12 @@ func TestWaitForChainStart_StreamSetupFails(t *testing.T) {
|
|||||||
gomock.Any(),
|
gomock.Any(),
|
||||||
).Return(nil, errors.New("failed stream"))
|
).Return(nil, errors.New("failed stream"))
|
||||||
|
|
||||||
validatorClient := &grpcValidatorClient{beaconNodeValidatorClient, true}
|
validatorClient := &grpcValidatorClient{
|
||||||
|
grpcClientManager: &grpcClientManager[eth.BeaconNodeValidatorClient]{
|
||||||
|
client: beaconNodeValidatorClient,
|
||||||
|
},
|
||||||
|
isEventStreamRunning: true,
|
||||||
|
}
|
||||||
_, err := validatorClient.WaitForChainStart(t.Context(), &emptypb.Empty{})
|
_, err := validatorClient.WaitForChainStart(t.Context(), &emptypb.Empty{})
|
||||||
want := "could not setup beacon chain ChainStart streaming client"
|
want := "could not setup beacon chain ChainStart streaming client"
|
||||||
assert.ErrorContains(t, want, err)
|
assert.ErrorContains(t, want, err)
|
||||||
@@ -146,7 +151,12 @@ func TestStartEventStream(t *testing.T) {
|
|||||||
ctrl := gomock.NewController(t)
|
ctrl := gomock.NewController(t)
|
||||||
defer ctrl.Finish()
|
defer ctrl.Finish()
|
||||||
beaconNodeValidatorClient := mock2.NewMockBeaconNodeValidatorClient(ctrl)
|
beaconNodeValidatorClient := mock2.NewMockBeaconNodeValidatorClient(ctrl)
|
||||||
grpcClient := &grpcValidatorClient{beaconNodeValidatorClient, true}
|
grpcClient := &grpcValidatorClient{
|
||||||
|
grpcClientManager: &grpcClientManager[eth.BeaconNodeValidatorClient]{
|
||||||
|
client: beaconNodeValidatorClient,
|
||||||
|
},
|
||||||
|
isEventStreamRunning: true,
|
||||||
|
}
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
topics []string
|
topics []string
|
||||||
|
|||||||
@@ -9,10 +9,9 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func NewNodeClient(validatorConn validatorHelpers.NodeConnection, jsonRestHandler beaconApi.RestHandler) iface.NodeClient {
|
func NewNodeClient(validatorConn validatorHelpers.NodeConnection, jsonRestHandler beaconApi.RestHandler) iface.NodeClient {
|
||||||
grpcClient := grpcApi.NewNodeClient(validatorConn.GetGrpcClientConn())
|
grpcClient := grpcApi.NewNodeClientWithConnection(validatorConn)
|
||||||
if features.Get().EnableBeaconRESTApi {
|
if features.Get().EnableBeaconRESTApi {
|
||||||
return beaconApi.NewNodeClientWithFallback(jsonRestHandler, grpcClient)
|
return beaconApi.NewNodeClientWithFallback(jsonRestHandler, grpcClient)
|
||||||
} else {
|
|
||||||
return grpcClient
|
|
||||||
}
|
}
|
||||||
|
return grpcClient
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -134,18 +134,34 @@ func NewValidatorService(ctx context.Context, cfg *Config) (*ValidatorService, e
|
|||||||
|
|
||||||
s.ctx = grpcutil.AppendHeaders(ctx, cfg.GRPCHeaders)
|
s.ctx = grpcutil.AppendHeaders(ctx, cfg.GRPCHeaders)
|
||||||
|
|
||||||
grpcConn, err := grpc.DialContext(ctx, cfg.BeaconNodeGRPCEndpoint, dialOpts...)
|
var grpcConn *grpc.ClientConn
|
||||||
if err != nil {
|
var grpcProvider validatorHelpers.GrpcConnectionProvider
|
||||||
return s, err
|
|
||||||
|
if cfg.BeaconNodeGRPCEndpoint != "" {
|
||||||
|
var err error
|
||||||
|
grpcProvider, err = validatorHelpers.NewGrpcConnectionProvider(ctx, cfg.BeaconNodeGRPCEndpoint, dialOpts)
|
||||||
|
if err != nil {
|
||||||
|
return s, errors.Wrap(err, "failed to create gRPC connection provider")
|
||||||
|
}
|
||||||
|
grpcConn = grpcProvider.CurrentConn()
|
||||||
}
|
}
|
||||||
|
|
||||||
if cfg.BeaconNodeCert != "" {
|
if cfg.BeaconNodeCert != "" {
|
||||||
log.Info("Established secure gRPC connection")
|
log.Info("Established secure gRPC connection")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
connOpts := []validatorHelpers.NodeConnectionOption{
|
||||||
|
validatorHelpers.WithBeaconApiHeaders(cfg.BeaconApiHeaders),
|
||||||
|
validatorHelpers.WithBeaconApiTimeout(cfg.BeaconApiTimeout),
|
||||||
|
}
|
||||||
|
if grpcProvider != nil {
|
||||||
|
connOpts = append(connOpts, validatorHelpers.WithGrpcConnectionProvider(grpcProvider))
|
||||||
|
}
|
||||||
|
|
||||||
s.conn = validatorHelpers.NewNodeConnection(
|
s.conn = validatorHelpers.NewNodeConnection(
|
||||||
grpcConn,
|
grpcConn,
|
||||||
cfg.BeaconApiEndpoint,
|
cfg.BeaconApiEndpoint,
|
||||||
validatorHelpers.WithBeaconApiHeaders(cfg.BeaconApiHeaders),
|
connOpts...,
|
||||||
validatorHelpers.WithBeaconApiTimeout(cfg.BeaconApiTimeout),
|
|
||||||
)
|
)
|
||||||
|
|
||||||
return s, nil
|
return s, nil
|
||||||
@@ -210,6 +226,7 @@ func (v *ValidatorService) Start() {
|
|||||||
graffitiOrderedIndex: graffitiOrderedIndex,
|
graffitiOrderedIndex: graffitiOrderedIndex,
|
||||||
beaconNodeHosts: hosts,
|
beaconNodeHosts: hosts,
|
||||||
currentHostIndex: 0,
|
currentHostIndex: 0,
|
||||||
|
grpcConnectionProvider: v.conn.GetGrpcConnectionProvider(),
|
||||||
validatorClient: validatorClient,
|
validatorClient: validatorClient,
|
||||||
chainClient: beaconChainClientFactory.NewChainClient(v.conn, restHandler),
|
chainClient: beaconChainClientFactory.NewChainClient(v.conn, restHandler),
|
||||||
nodeClient: nodeclientfactory.NewNodeClient(v.conn, restHandler),
|
nodeClient: nodeclientfactory.NewNodeClient(v.conn, restHandler),
|
||||||
|
|||||||
@@ -15,7 +15,6 @@ func NewValidatorClient(
|
|||||||
) iface.ValidatorClient {
|
) iface.ValidatorClient {
|
||||||
if features.Get().EnableBeaconRESTApi {
|
if features.Get().EnableBeaconRESTApi {
|
||||||
return beaconApi.NewBeaconApiValidatorClient(jsonRestHandler, opt...)
|
return beaconApi.NewBeaconApiValidatorClient(jsonRestHandler, opt...)
|
||||||
} else {
|
|
||||||
return grpcApi.NewGrpcValidatorClient(validatorConn.GetGrpcClientConn())
|
|
||||||
}
|
}
|
||||||
|
return grpcApi.NewGrpcValidatorClientWithConnection(validatorConn)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -38,6 +38,7 @@ import (
|
|||||||
"github.com/OffchainLabs/prysm/v7/validator/db"
|
"github.com/OffchainLabs/prysm/v7/validator/db"
|
||||||
dbCommon "github.com/OffchainLabs/prysm/v7/validator/db/common"
|
dbCommon "github.com/OffchainLabs/prysm/v7/validator/db/common"
|
||||||
"github.com/OffchainLabs/prysm/v7/validator/graffiti"
|
"github.com/OffchainLabs/prysm/v7/validator/graffiti"
|
||||||
|
validatorHelpers "github.com/OffchainLabs/prysm/v7/validator/helpers"
|
||||||
"github.com/OffchainLabs/prysm/v7/validator/keymanager"
|
"github.com/OffchainLabs/prysm/v7/validator/keymanager"
|
||||||
"github.com/OffchainLabs/prysm/v7/validator/keymanager/local"
|
"github.com/OffchainLabs/prysm/v7/validator/keymanager/local"
|
||||||
remoteweb3signer "github.com/OffchainLabs/prysm/v7/validator/keymanager/remote-web3signer"
|
remoteweb3signer "github.com/OffchainLabs/prysm/v7/validator/keymanager/remote-web3signer"
|
||||||
@@ -82,6 +83,7 @@ type validator struct {
|
|||||||
graffitiOrderedIndex uint64
|
graffitiOrderedIndex uint64
|
||||||
beaconNodeHosts []string
|
beaconNodeHosts []string
|
||||||
currentHostIndex uint64
|
currentHostIndex uint64
|
||||||
|
grpcConnectionProvider validatorHelpers.GrpcConnectionProvider
|
||||||
validatorClient iface.ValidatorClient
|
validatorClient iface.ValidatorClient
|
||||||
chainClient iface.ChainClient
|
chainClient iface.ChainClient
|
||||||
nodeClient iface.NodeClient
|
nodeClient iface.NodeClient
|
||||||
@@ -1261,15 +1263,35 @@ func (v *validator) Host() string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (v *validator) changeHost() {
|
func (v *validator) changeHost() {
|
||||||
next := (v.currentHostIndex + 1) % uint64(len(v.beaconNodeHosts))
|
hosts := v.hosts()
|
||||||
|
if len(hosts) <= 1 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
next := (v.currentHostIndex + 1) % uint64(len(hosts))
|
||||||
log.WithFields(logrus.Fields{
|
log.WithFields(logrus.Fields{
|
||||||
"currentHost": v.beaconNodeHosts[v.currentHostIndex],
|
"currentHost": hosts[v.currentHostIndex],
|
||||||
"nextHost": v.beaconNodeHosts[next],
|
"nextHost": hosts[next],
|
||||||
}).Warn("Beacon node is not responding, switching host")
|
}).Warn("Beacon node is not responding, switching host")
|
||||||
v.validatorClient.SetHost(v.beaconNodeHosts[next])
|
v.validatorClient.SetHost(hosts[next])
|
||||||
v.currentHostIndex = next
|
v.currentHostIndex = next
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// hosts returns the list of configured beacon node hosts for failover.
|
||||||
|
func (v *validator) hosts() []string {
|
||||||
|
if features.Get().EnableBeaconRESTApi {
|
||||||
|
return v.beaconNodeHosts
|
||||||
|
}
|
||||||
|
if v.grpcConnectionProvider != nil {
|
||||||
|
return v.grpcConnectionProvider.Hosts()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// numHosts returns the number of configured beacon node hosts for failover.
|
||||||
|
func (v *validator) numHosts() int {
|
||||||
|
return len(v.hosts())
|
||||||
|
}
|
||||||
|
|
||||||
func (v *validator) FindHealthyHost(ctx context.Context) bool {
|
func (v *validator) FindHealthyHost(ctx context.Context) bool {
|
||||||
// Tail-recursive closure keeps retry count private.
|
// Tail-recursive closure keeps retry count private.
|
||||||
var check func(remaining int) bool
|
var check func(remaining int) bool
|
||||||
@@ -1277,18 +1299,20 @@ func (v *validator) FindHealthyHost(ctx context.Context) bool {
|
|||||||
if v.nodeClient.IsReady(ctx) { // ready → done
|
if v.nodeClient.IsReady(ctx) { // ready → done
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
if len(v.beaconNodeHosts) == 1 && features.Get().EnableBeaconRESTApi {
|
log.WithField("host", v.Host()).Debug("Beacon node not fully synced")
|
||||||
log.WithField("host", v.Host()).Warn("Beacon node is not responding, no backup node configured")
|
|
||||||
return false
|
// Try next host if not the last iteration
|
||||||
|
if i < numHosts-1 {
|
||||||
|
v.changeHost()
|
||||||
}
|
}
|
||||||
if remaining == 0 || !features.Get().EnableBeaconRESTApi {
|
|
||||||
return false // exhausted or REST disabled
|
|
||||||
}
|
|
||||||
v.changeHost()
|
|
||||||
return check(remaining - 1) // recurse
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return check(len(v.beaconNodeHosts))
|
if numHosts == 1 {
|
||||||
|
log.WithField("host", v.Host()).Warn("Beacon node is not fully synced, no backup node configured")
|
||||||
|
} else {
|
||||||
|
log.Warn("No fully synced beacon node found")
|
||||||
|
}
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *validator) filterAndCacheActiveKeys(ctx context.Context, pubkeys [][fieldparams.BLSPubkeyLength]byte, slot primitives.Slot) ([][fieldparams.BLSPubkeyLength]byte, error) {
|
func (v *validator) filterAndCacheActiveKeys(ctx context.Context, pubkeys [][fieldparams.BLSPubkeyLength]byte, slot primitives.Slot) ([][fieldparams.BLSPubkeyLength]byte, error) {
|
||||||
|
|||||||
@@ -2792,6 +2792,10 @@ func TestValidator_Host(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestValidator_ChangeHost(t *testing.T) {
|
func TestValidator_ChangeHost(t *testing.T) {
|
||||||
|
// Enable REST API mode for this test since changeHost only calls SetHost in REST API mode
|
||||||
|
resetCfg := features.InitWithReset(&features.Flags{EnableBeaconRESTApi: true})
|
||||||
|
defer resetCfg()
|
||||||
|
|
||||||
ctrl := gomock.NewController(t)
|
ctrl := gomock.NewController(t)
|
||||||
defer ctrl.Finish()
|
defer ctrl.Finish()
|
||||||
|
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ go_library(
|
|||||||
name = "go_default_library",
|
name = "go_default_library",
|
||||||
srcs = [
|
srcs = [
|
||||||
"converts.go",
|
"converts.go",
|
||||||
|
"grpc_connection_provider.go",
|
||||||
"metadata.go",
|
"metadata.go",
|
||||||
"node_connection.go",
|
"node_connection.go",
|
||||||
],
|
],
|
||||||
@@ -15,6 +16,7 @@ go_library(
|
|||||||
"//validator/db/iface:go_default_library",
|
"//validator/db/iface:go_default_library",
|
||||||
"//validator/slashing-protection-history/format:go_default_library",
|
"//validator/slashing-protection-history/format:go_default_library",
|
||||||
"@com_github_pkg_errors//:go_default_library",
|
"@com_github_pkg_errors//:go_default_library",
|
||||||
|
"@com_github_sirupsen_logrus//:go_default_library",
|
||||||
"@org_golang_google_grpc//:go_default_library",
|
"@org_golang_google_grpc//:go_default_library",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|||||||
181
validator/helpers/grpc_connection_provider.go
Normal file
181
validator/helpers/grpc_connection_provider.go
Normal file
@@ -0,0 +1,181 @@
|
|||||||
|
package helpers
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
|
||||||
|
pkgErrors "github.com/pkg/errors"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
)
|
||||||
|
|
||||||
|
var log = logrus.WithField("prefix", "helpers")
|
||||||
|
|
||||||
|
// GrpcConnectionProvider manages multiple gRPC connections for failover support.
|
||||||
|
// It allows switching between different beacon node endpoints when the current one becomes unavailable.
|
||||||
|
type GrpcConnectionProvider interface {
|
||||||
|
// CurrentConn returns the currently active gRPC connection.
|
||||||
|
// Returns nil if the provider has been closed.
|
||||||
|
CurrentConn() *grpc.ClientConn
|
||||||
|
// CurrentHost returns the address of the currently active endpoint.
|
||||||
|
CurrentHost() string
|
||||||
|
// Hosts returns all configured endpoint addresses.
|
||||||
|
Hosts() []string
|
||||||
|
// Conn returns the connection at the given index.
|
||||||
|
Conn(index int) *grpc.ClientConn
|
||||||
|
// SetHost switches to the endpoint at the given index.
|
||||||
|
SetHost(index int) error
|
||||||
|
// NextHost switches to the next available endpoint in round-robin fashion.
|
||||||
|
NextHost()
|
||||||
|
// Close closes all managed connections.
|
||||||
|
Close() error
|
||||||
|
}
|
||||||
|
|
||||||
|
type grpcConnectionProvider struct {
|
||||||
|
// Immutable after construction - no lock needed for reads
|
||||||
|
endpoints []string
|
||||||
|
connections []*grpc.ClientConn
|
||||||
|
|
||||||
|
// Atomic index for lock-free current endpoint access
|
||||||
|
currentIndex atomic.Uint64
|
||||||
|
|
||||||
|
// Mutex only for Close() and write operations that need log consistency
|
||||||
|
mu sync.Mutex
|
||||||
|
closed atomic.Bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewGrpcConnectionProvider creates a new connection provider that manages multiple gRPC connections.
|
||||||
|
// The endpoint parameter can be a comma-separated list of addresses (e.g., "host1:4000,host2:4000").
|
||||||
|
// It creates a separate connection for each endpoint using the provided dial options.
|
||||||
|
func NewGrpcConnectionProvider(
|
||||||
|
ctx context.Context,
|
||||||
|
endpoint string,
|
||||||
|
dialOpts []grpc.DialOption,
|
||||||
|
) (GrpcConnectionProvider, error) {
|
||||||
|
endpoints := parseEndpoints(endpoint)
|
||||||
|
if len(endpoints) == 0 {
|
||||||
|
return nil, pkgErrors.New("no gRPC endpoints provided")
|
||||||
|
}
|
||||||
|
|
||||||
|
connections := make([]*grpc.ClientConn, 0, len(endpoints))
|
||||||
|
for _, ep := range endpoints {
|
||||||
|
conn, err := grpc.DialContext(ctx, ep, dialOpts...)
|
||||||
|
if err != nil {
|
||||||
|
// Clean up already created connections
|
||||||
|
for _, c := range connections {
|
||||||
|
if err := c.Close(); err != nil {
|
||||||
|
log.WithError(err).Warn("Failed to close connection during cleanup")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, pkgErrors.Wrapf(err, "failed to connect to gRPC endpoint %s", ep)
|
||||||
|
}
|
||||||
|
connections = append(connections, conn)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.WithFields(logrus.Fields{
|
||||||
|
"endpoints": endpoints,
|
||||||
|
"count": len(endpoints),
|
||||||
|
}).Info("Initialized gRPC connection provider with multiple endpoints")
|
||||||
|
|
||||||
|
return &grpcConnectionProvider{
|
||||||
|
endpoints: endpoints,
|
||||||
|
connections: connections,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// parseEndpoints splits a comma-separated endpoint string into individual endpoints.
|
||||||
|
func parseEndpoints(endpoint string) []string {
|
||||||
|
if endpoint == "" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
var endpoints []string
|
||||||
|
for p := range strings.SplitSeq(endpoint, ",") {
|
||||||
|
if p = strings.TrimSpace(p); p != "" {
|
||||||
|
endpoints = append(endpoints, p)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return endpoints
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *grpcConnectionProvider) CurrentConn() *grpc.ClientConn {
|
||||||
|
if p.closed.Load() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
idx := p.currentIndex.Load() % uint64(len(p.connections))
|
||||||
|
return p.connections[idx]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *grpcConnectionProvider) CurrentHost() string {
|
||||||
|
idx := p.currentIndex.Load() % uint64(len(p.endpoints))
|
||||||
|
return p.endpoints[idx]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *grpcConnectionProvider) Hosts() []string {
|
||||||
|
// Return a copy to maintain immutability
|
||||||
|
hosts := make([]string, len(p.endpoints))
|
||||||
|
copy(hosts, p.endpoints)
|
||||||
|
return hosts
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *grpcConnectionProvider) Conn(index int) *grpc.ClientConn {
|
||||||
|
if p.closed.Load() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if index < 0 || index >= len(p.connections) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return p.connections[index]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *grpcConnectionProvider) SetHost(index int) error {
|
||||||
|
if index < 0 || index >= len(p.endpoints) {
|
||||||
|
return pkgErrors.Errorf("invalid host index %d, must be between 0 and %d", index, len(p.endpoints)-1)
|
||||||
|
}
|
||||||
|
|
||||||
|
p.mu.Lock()
|
||||||
|
defer p.mu.Unlock()
|
||||||
|
|
||||||
|
oldIdx := p.currentIndex.Load()
|
||||||
|
p.currentIndex.Store(uint64(index))
|
||||||
|
|
||||||
|
log.WithFields(logrus.Fields{
|
||||||
|
"previousHost": p.endpoints[oldIdx%uint64(len(p.endpoints))],
|
||||||
|
"newHost": p.endpoints[index],
|
||||||
|
}).Info("Switched gRPC endpoint")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *grpcConnectionProvider) NextHost() {
|
||||||
|
p.mu.Lock()
|
||||||
|
defer p.mu.Unlock()
|
||||||
|
|
||||||
|
oldIdx := p.currentIndex.Load()
|
||||||
|
newIdx := (oldIdx + 1) % uint64(len(p.endpoints))
|
||||||
|
p.currentIndex.Store(newIdx)
|
||||||
|
|
||||||
|
log.WithFields(logrus.Fields{
|
||||||
|
"previousHost": p.endpoints[oldIdx],
|
||||||
|
"newHost": p.endpoints[newIdx],
|
||||||
|
}).Debug("Switched to next gRPC endpoint")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *grpcConnectionProvider) Close() error {
|
||||||
|
p.mu.Lock()
|
||||||
|
defer p.mu.Unlock()
|
||||||
|
|
||||||
|
if p.closed.Load() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
p.closed.Store(true)
|
||||||
|
|
||||||
|
var errs []error
|
||||||
|
for i, conn := range p.connections {
|
||||||
|
if err := conn.Close(); err != nil {
|
||||||
|
errs = append(errs, pkgErrors.Wrapf(err, "failed to close connection to %s", p.endpoints[i]))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return errors.Join(errs...)
|
||||||
|
}
|
||||||
@@ -14,14 +14,19 @@ type NodeConnection interface {
|
|||||||
setBeaconApiHeaders(map[string][]string)
|
setBeaconApiHeaders(map[string][]string)
|
||||||
GetBeaconApiTimeout() time.Duration
|
GetBeaconApiTimeout() time.Duration
|
||||||
setBeaconApiTimeout(time.Duration)
|
setBeaconApiTimeout(time.Duration)
|
||||||
|
// GetGrpcConnectionProvider returns the gRPC connection provider for multi-endpoint support.
|
||||||
|
// Returns nil if no provider is configured (single endpoint mode).
|
||||||
|
GetGrpcConnectionProvider() GrpcConnectionProvider
|
||||||
|
setGrpcConnectionProvider(GrpcConnectionProvider)
|
||||||
dummy()
|
dummy()
|
||||||
}
|
}
|
||||||
|
|
||||||
type nodeConnection struct {
|
type nodeConnection struct {
|
||||||
grpcClientConn *grpc.ClientConn
|
grpcClientConn *grpc.ClientConn
|
||||||
beaconApiUrl string
|
grpcConnectionProvider GrpcConnectionProvider
|
||||||
beaconApiHeaders map[string][]string
|
beaconApiUrl string
|
||||||
beaconApiTimeout time.Duration
|
beaconApiHeaders map[string][]string
|
||||||
|
beaconApiTimeout time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
// NodeConnectionOption is a functional option for configuring the node connection.
|
// NodeConnectionOption is a functional option for configuring the node connection.
|
||||||
@@ -41,7 +46,18 @@ func WithBeaconApiTimeout(timeout time.Duration) NodeConnectionOption {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithGrpcConnectionProvider sets the gRPC connection provider for multi-endpoint support.
|
||||||
|
func WithGrpcConnectionProvider(provider GrpcConnectionProvider) NodeConnectionOption {
|
||||||
|
return func(nc NodeConnection) {
|
||||||
|
nc.setGrpcConnectionProvider(provider)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (c *nodeConnection) GetGrpcClientConn() *grpc.ClientConn {
|
func (c *nodeConnection) GetGrpcClientConn() *grpc.ClientConn {
|
||||||
|
// If a connection provider is configured, use its current connection
|
||||||
|
if c.grpcConnectionProvider != nil {
|
||||||
|
return c.grpcConnectionProvider.CurrentConn()
|
||||||
|
}
|
||||||
return c.grpcClientConn
|
return c.grpcClientConn
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -65,6 +81,14 @@ func (c *nodeConnection) setBeaconApiTimeout(timeout time.Duration) {
|
|||||||
c.beaconApiTimeout = timeout
|
c.beaconApiTimeout = timeout
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *nodeConnection) GetGrpcConnectionProvider() GrpcConnectionProvider {
|
||||||
|
return c.grpcConnectionProvider
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *nodeConnection) setGrpcConnectionProvider(provider GrpcConnectionProvider) {
|
||||||
|
c.grpcConnectionProvider = provider
|
||||||
|
}
|
||||||
|
|
||||||
func (*nodeConnection) dummy() {}
|
func (*nodeConnection) dummy() {}
|
||||||
|
|
||||||
func NewNodeConnection(grpcConn *grpc.ClientConn, beaconApiUrl string, opts ...NodeConnectionOption) NodeConnection {
|
func NewNodeConnection(grpcConn *grpc.ClientConn, beaconApiUrl string, opts ...NodeConnectionOption) NodeConnection {
|
||||||
|
|||||||
Reference in New Issue
Block a user