mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-10 05:47:59 -05:00
Compare commits
9 Commits
dump-heap-
...
gRPC-fallb
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9f828bdd88 | ||
|
|
17413b52ed | ||
|
|
a651e7f0ac | ||
|
|
3e1cb45e92 | ||
|
|
fc2dcb0e88 | ||
|
|
888db581dd | ||
|
|
f1d2ee72e2 | ||
|
|
31f18b9f60 | ||
|
|
6462c997e9 |
@@ -10,18 +10,16 @@ import (
|
||||
)
|
||||
|
||||
func NewChainClient(validatorConn validatorHelpers.NodeConnection, jsonRestHandler beaconApi.RestHandler) iface.ChainClient {
|
||||
grpcClient := grpcApi.NewGrpcChainClient(validatorConn.GetGrpcClientConn())
|
||||
grpcClient := grpcApi.NewGrpcChainClientWithConnection(validatorConn)
|
||||
if features.Get().EnableBeaconRESTApi {
|
||||
return beaconApi.NewBeaconApiChainClientWithFallback(jsonRestHandler, grpcClient)
|
||||
} else {
|
||||
return grpcClient
|
||||
}
|
||||
return grpcClient
|
||||
}
|
||||
|
||||
func NewPrysmChainClient(validatorConn validatorHelpers.NodeConnection, jsonRestHandler beaconApi.RestHandler) iface.PrysmChainClient {
|
||||
if features.Get().EnableBeaconRESTApi {
|
||||
return beaconApi.NewPrysmChainClient(jsonRestHandler, nodeClientFactory.NewNodeClient(validatorConn, jsonRestHandler))
|
||||
} else {
|
||||
return grpcApi.NewGrpcPrysmChainClient(validatorConn.GetGrpcClientConn())
|
||||
}
|
||||
return grpcApi.NewGrpcPrysmChainClientWithConnection(validatorConn)
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ go_library(
|
||||
name = "go_default_library",
|
||||
srcs = [
|
||||
"grpc_beacon_chain_client.go",
|
||||
"grpc_client_manager.go",
|
||||
"grpc_node_client.go",
|
||||
"grpc_prysm_beacon_chain_client.go",
|
||||
"grpc_validator_client.go",
|
||||
@@ -25,6 +26,7 @@ go_library(
|
||||
"//proto/eth/v1:go_default_library",
|
||||
"//proto/prysm/v1alpha1:go_default_library",
|
||||
"//validator/client/iface:go_default_library",
|
||||
"//validator/helpers:go_default_library",
|
||||
"@com_github_ethereum_go_ethereum//common/hexutil:go_default_library",
|
||||
"@com_github_golang_protobuf//ptypes/empty",
|
||||
"@com_github_pkg_errors//:go_default_library",
|
||||
|
||||
@@ -5,38 +5,42 @@ import (
|
||||
|
||||
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
|
||||
"github.com/OffchainLabs/prysm/v7/validator/client/iface"
|
||||
validatorHelpers "github.com/OffchainLabs/prysm/v7/validator/helpers"
|
||||
"github.com/golang/protobuf/ptypes/empty"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
type grpcChainClient struct {
|
||||
beaconChainClient ethpb.BeaconChainClient
|
||||
*grpcClientManager[ethpb.BeaconChainClient]
|
||||
}
|
||||
|
||||
func (c *grpcChainClient) ChainHead(ctx context.Context, in *empty.Empty) (*ethpb.ChainHead, error) {
|
||||
return c.beaconChainClient.GetChainHead(ctx, in)
|
||||
return c.getClient().GetChainHead(ctx, in)
|
||||
}
|
||||
|
||||
func (c *grpcChainClient) ValidatorBalances(ctx context.Context, in *ethpb.ListValidatorBalancesRequest) (*ethpb.ValidatorBalances, error) {
|
||||
return c.beaconChainClient.ListValidatorBalances(ctx, in)
|
||||
return c.getClient().ListValidatorBalances(ctx, in)
|
||||
}
|
||||
|
||||
func (c *grpcChainClient) Validators(ctx context.Context, in *ethpb.ListValidatorsRequest) (*ethpb.Validators, error) {
|
||||
return c.beaconChainClient.ListValidators(ctx, in)
|
||||
return c.getClient().ListValidators(ctx, in)
|
||||
}
|
||||
|
||||
func (c *grpcChainClient) ValidatorQueue(ctx context.Context, in *empty.Empty) (*ethpb.ValidatorQueue, error) {
|
||||
return c.beaconChainClient.GetValidatorQueue(ctx, in)
|
||||
return c.getClient().GetValidatorQueue(ctx, in)
|
||||
}
|
||||
|
||||
func (c *grpcChainClient) ValidatorPerformance(ctx context.Context, in *ethpb.ValidatorPerformanceRequest) (*ethpb.ValidatorPerformanceResponse, error) {
|
||||
return c.beaconChainClient.GetValidatorPerformance(ctx, in)
|
||||
return c.getClient().GetValidatorPerformance(ctx, in)
|
||||
}
|
||||
|
||||
func (c *grpcChainClient) ValidatorParticipation(ctx context.Context, in *ethpb.GetValidatorParticipationRequest) (*ethpb.ValidatorParticipationResponse, error) {
|
||||
return c.beaconChainClient.GetValidatorParticipation(ctx, in)
|
||||
return c.getClient().GetValidatorParticipation(ctx, in)
|
||||
}
|
||||
|
||||
func NewGrpcChainClient(cc grpc.ClientConnInterface) iface.ChainClient {
|
||||
return &grpcChainClient{ethpb.NewBeaconChainClient(cc)}
|
||||
// NewGrpcChainClientWithConnection creates a new gRPC chain client that supports
|
||||
// dynamic connection switching via the NodeConnection's GrpcConnectionProvider.
|
||||
func NewGrpcChainClientWithConnection(conn validatorHelpers.NodeConnection) iface.ChainClient {
|
||||
return &grpcChainClient{
|
||||
grpcClientManager: newGrpcClientManager(conn, ethpb.NewBeaconChainClient),
|
||||
}
|
||||
}
|
||||
|
||||
61
validator/client/grpc-api/grpc_client_manager.go
Normal file
61
validator/client/grpc-api/grpc_client_manager.go
Normal file
@@ -0,0 +1,61 @@
|
||||
package grpc_api
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
validatorHelpers "github.com/OffchainLabs/prysm/v7/validator/helpers"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
// grpcClientManager handles dynamic gRPC client recreation when the connection changes.
|
||||
// It uses generics to work with any gRPC client type.
|
||||
type grpcClientManager[T any] struct {
|
||||
conn validatorHelpers.NodeConnection
|
||||
client T
|
||||
lastHost string
|
||||
clientMu sync.RWMutex
|
||||
newClient func(grpc.ClientConnInterface) T
|
||||
}
|
||||
|
||||
// newGrpcClientManager creates a new client manager with the given connection and client constructor.
|
||||
func newGrpcClientManager[T any](
|
||||
conn validatorHelpers.NodeConnection,
|
||||
newClient func(grpc.ClientConnInterface) T,
|
||||
) *grpcClientManager[T] {
|
||||
m := &grpcClientManager[T]{
|
||||
conn: conn,
|
||||
newClient: newClient,
|
||||
client: newClient(conn.GetGrpcClientConn()),
|
||||
}
|
||||
if provider := conn.GetGrpcConnectionProvider(); provider != nil {
|
||||
m.lastHost = provider.CurrentHost()
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
// getClient returns the current client, recreating it if the connection has changed.
|
||||
func (m *grpcClientManager[T]) getClient() T {
|
||||
if m.conn == nil || m.conn.GetGrpcConnectionProvider() == nil {
|
||||
return m.client
|
||||
}
|
||||
|
||||
currentHost := m.conn.GetGrpcConnectionProvider().CurrentHost()
|
||||
m.clientMu.RLock()
|
||||
if m.lastHost == currentHost {
|
||||
client := m.client
|
||||
m.clientMu.RUnlock()
|
||||
return client
|
||||
}
|
||||
m.clientMu.RUnlock()
|
||||
|
||||
// Connection changed, need to recreate client
|
||||
m.clientMu.Lock()
|
||||
defer m.clientMu.Unlock()
|
||||
// Double-check after acquiring write lock
|
||||
if m.lastHost == currentHost {
|
||||
return m.client
|
||||
}
|
||||
m.client = m.newClient(m.conn.GetGrpcClientConn())
|
||||
m.lastHost = currentHost
|
||||
return m.client
|
||||
}
|
||||
@@ -5,8 +5,8 @@ import (
|
||||
|
||||
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
|
||||
"github.com/OffchainLabs/prysm/v7/validator/client/iface"
|
||||
validatorHelpers "github.com/OffchainLabs/prysm/v7/validator/helpers"
|
||||
"github.com/golang/protobuf/ptypes/empty"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -14,35 +14,48 @@ var (
|
||||
)
|
||||
|
||||
type grpcNodeClient struct {
|
||||
nodeClient ethpb.NodeClient
|
||||
*grpcClientManager[ethpb.NodeClient]
|
||||
}
|
||||
|
||||
func (c *grpcNodeClient) SyncStatus(ctx context.Context, in *empty.Empty) (*ethpb.SyncStatus, error) {
|
||||
return c.nodeClient.GetSyncStatus(ctx, in)
|
||||
return c.getClient().GetSyncStatus(ctx, in)
|
||||
}
|
||||
|
||||
func (c *grpcNodeClient) Genesis(ctx context.Context, in *empty.Empty) (*ethpb.Genesis, error) {
|
||||
return c.nodeClient.GetGenesis(ctx, in)
|
||||
return c.getClient().GetGenesis(ctx, in)
|
||||
}
|
||||
|
||||
func (c *grpcNodeClient) Version(ctx context.Context, in *empty.Empty) (*ethpb.Version, error) {
|
||||
return c.nodeClient.GetVersion(ctx, in)
|
||||
return c.getClient().GetVersion(ctx, in)
|
||||
}
|
||||
|
||||
func (c *grpcNodeClient) Peers(ctx context.Context, in *empty.Empty) (*ethpb.Peers, error) {
|
||||
return c.nodeClient.ListPeers(ctx, in)
|
||||
return c.getClient().ListPeers(ctx, in)
|
||||
}
|
||||
|
||||
func (c *grpcNodeClient) IsReady(ctx context.Context) bool {
|
||||
_, err := c.nodeClient.GetHealth(ctx, ðpb.HealthRequest{})
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to get health of node")
|
||||
log.WithError(err).Debug("Failed to get health of node")
|
||||
return false
|
||||
}
|
||||
// Then check sync status - we only want fully synced nodes
|
||||
syncStatus, err := c.getClient().GetSyncStatus(ctx, &empty.Empty{})
|
||||
if err != nil {
|
||||
log.WithError(err).Debug("Failed to get sync status of node")
|
||||
return false
|
||||
}
|
||||
if syncStatus.Syncing {
|
||||
log.Debug("Node is syncing, not fully synced")
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func NewNodeClient(cc grpc.ClientConnInterface) iface.NodeClient {
|
||||
g := &grpcNodeClient{nodeClient: ethpb.NewNodeClient(cc)}
|
||||
return g
|
||||
// NewNodeClientWithConnection creates a new gRPC node client that supports
|
||||
// dynamic connection switching via the NodeConnection's GrpcConnectionProvider.
|
||||
func NewNodeClientWithConnection(conn validatorHelpers.NodeConnection) iface.NodeClient {
|
||||
return &grpcNodeClient{
|
||||
grpcClientManager: newGrpcClientManager(conn, ethpb.NewNodeClient),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,9 +12,9 @@ import (
|
||||
eth "github.com/OffchainLabs/prysm/v7/proto/eth/v1"
|
||||
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
|
||||
"github.com/OffchainLabs/prysm/v7/validator/client/iface"
|
||||
validatorHelpers "github.com/OffchainLabs/prysm/v7/validator/helpers"
|
||||
"github.com/golang/protobuf/ptypes/empty"
|
||||
"github.com/pkg/errors"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
type grpcPrysmChainClient struct {
|
||||
@@ -95,6 +95,8 @@ func (c *grpcPrysmChainClient) ValidatorPerformance(ctx context.Context, in *eth
|
||||
return c.chainClient.ValidatorPerformance(ctx, in)
|
||||
}
|
||||
|
||||
func NewGrpcPrysmChainClient(cc grpc.ClientConnInterface) iface.PrysmChainClient {
|
||||
return &grpcPrysmChainClient{chainClient: &grpcChainClient{ethpb.NewBeaconChainClient(cc)}}
|
||||
// NewGrpcPrysmChainClientWithConnection creates a new gRPC Prysm chain client that supports
|
||||
// dynamic connection switching via the NodeConnection's GrpcConnectionProvider.
|
||||
func NewGrpcPrysmChainClientWithConnection(conn validatorHelpers.NodeConnection) iface.PrysmChainClient {
|
||||
return &grpcPrysmChainClient{chainClient: NewGrpcChainClientWithConnection(conn)}
|
||||
}
|
||||
|
||||
@@ -14,24 +14,24 @@ import (
|
||||
"github.com/OffchainLabs/prysm/v7/monitoring/tracing/trace"
|
||||
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
|
||||
"github.com/OffchainLabs/prysm/v7/validator/client/iface"
|
||||
validatorHelpers "github.com/OffchainLabs/prysm/v7/validator/helpers"
|
||||
"github.com/ethereum/go-ethereum/common/hexutil"
|
||||
"github.com/golang/protobuf/ptypes/empty"
|
||||
"github.com/pkg/errors"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
type grpcValidatorClient struct {
|
||||
beaconNodeValidatorClient ethpb.BeaconNodeValidatorClient
|
||||
isEventStreamRunning bool
|
||||
*grpcClientManager[ethpb.BeaconNodeValidatorClient]
|
||||
isEventStreamRunning bool
|
||||
}
|
||||
|
||||
func (c *grpcValidatorClient) Duties(ctx context.Context, in *ethpb.DutiesRequest) (*ethpb.ValidatorDutiesContainer, error) {
|
||||
if features.Get().DisableDutiesV2 {
|
||||
return c.getDuties(ctx, in)
|
||||
}
|
||||
dutiesResponse, err := c.beaconNodeValidatorClient.GetDutiesV2(ctx, in)
|
||||
dutiesResponse, err := c.getClient().GetDutiesV2(ctx, in)
|
||||
if err != nil {
|
||||
if status.Code(err) == codes.Unimplemented {
|
||||
log.Warn("GetDutiesV2 returned status code unavailable, falling back to GetDuties")
|
||||
@@ -47,7 +47,7 @@ func (c *grpcValidatorClient) Duties(ctx context.Context, in *ethpb.DutiesReques
|
||||
|
||||
// getDuties is calling the v1 of get duties
|
||||
func (c *grpcValidatorClient) getDuties(ctx context.Context, in *ethpb.DutiesRequest) (*ethpb.ValidatorDutiesContainer, error) {
|
||||
dutiesResponse, err := c.beaconNodeValidatorClient.GetDuties(ctx, in)
|
||||
dutiesResponse, err := c.getClient().GetDuties(ctx, in)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(
|
||||
client.ErrConnectionIssue,
|
||||
@@ -147,108 +147,108 @@ func toValidatorDutyV2(duty *ethpb.DutiesV2Response_Duty) (*ethpb.ValidatorDuty,
|
||||
}
|
||||
|
||||
func (c *grpcValidatorClient) CheckDoppelGanger(ctx context.Context, in *ethpb.DoppelGangerRequest) (*ethpb.DoppelGangerResponse, error) {
|
||||
return c.beaconNodeValidatorClient.CheckDoppelGanger(ctx, in)
|
||||
return c.getClient().CheckDoppelGanger(ctx, in)
|
||||
}
|
||||
|
||||
func (c *grpcValidatorClient) DomainData(ctx context.Context, in *ethpb.DomainRequest) (*ethpb.DomainResponse, error) {
|
||||
return c.beaconNodeValidatorClient.DomainData(ctx, in)
|
||||
return c.getClient().DomainData(ctx, in)
|
||||
}
|
||||
|
||||
func (c *grpcValidatorClient) AttestationData(ctx context.Context, in *ethpb.AttestationDataRequest) (*ethpb.AttestationData, error) {
|
||||
return c.beaconNodeValidatorClient.GetAttestationData(ctx, in)
|
||||
return c.getClient().GetAttestationData(ctx, in)
|
||||
}
|
||||
|
||||
func (c *grpcValidatorClient) BeaconBlock(ctx context.Context, in *ethpb.BlockRequest) (*ethpb.GenericBeaconBlock, error) {
|
||||
return c.beaconNodeValidatorClient.GetBeaconBlock(ctx, in)
|
||||
return c.getClient().GetBeaconBlock(ctx, in)
|
||||
}
|
||||
|
||||
func (c *grpcValidatorClient) FeeRecipientByPubKey(ctx context.Context, in *ethpb.FeeRecipientByPubKeyRequest) (*ethpb.FeeRecipientByPubKeyResponse, error) {
|
||||
return c.beaconNodeValidatorClient.GetFeeRecipientByPubKey(ctx, in)
|
||||
return c.getClient().GetFeeRecipientByPubKey(ctx, in)
|
||||
}
|
||||
|
||||
func (c *grpcValidatorClient) SyncCommitteeContribution(ctx context.Context, in *ethpb.SyncCommitteeContributionRequest) (*ethpb.SyncCommitteeContribution, error) {
|
||||
return c.beaconNodeValidatorClient.GetSyncCommitteeContribution(ctx, in)
|
||||
return c.getClient().GetSyncCommitteeContribution(ctx, in)
|
||||
}
|
||||
|
||||
func (c *grpcValidatorClient) SyncMessageBlockRoot(ctx context.Context, in *empty.Empty) (*ethpb.SyncMessageBlockRootResponse, error) {
|
||||
return c.beaconNodeValidatorClient.GetSyncMessageBlockRoot(ctx, in)
|
||||
return c.getClient().GetSyncMessageBlockRoot(ctx, in)
|
||||
}
|
||||
|
||||
func (c *grpcValidatorClient) SyncSubcommitteeIndex(ctx context.Context, in *ethpb.SyncSubcommitteeIndexRequest) (*ethpb.SyncSubcommitteeIndexResponse, error) {
|
||||
return c.beaconNodeValidatorClient.GetSyncSubcommitteeIndex(ctx, in)
|
||||
return c.getClient().GetSyncSubcommitteeIndex(ctx, in)
|
||||
}
|
||||
|
||||
func (c *grpcValidatorClient) MultipleValidatorStatus(ctx context.Context, in *ethpb.MultipleValidatorStatusRequest) (*ethpb.MultipleValidatorStatusResponse, error) {
|
||||
return c.beaconNodeValidatorClient.MultipleValidatorStatus(ctx, in)
|
||||
return c.getClient().MultipleValidatorStatus(ctx, in)
|
||||
}
|
||||
|
||||
func (c *grpcValidatorClient) PrepareBeaconProposer(ctx context.Context, in *ethpb.PrepareBeaconProposerRequest) (*empty.Empty, error) {
|
||||
return c.beaconNodeValidatorClient.PrepareBeaconProposer(ctx, in)
|
||||
return c.getClient().PrepareBeaconProposer(ctx, in)
|
||||
}
|
||||
|
||||
func (c *grpcValidatorClient) ProposeAttestation(ctx context.Context, in *ethpb.Attestation) (*ethpb.AttestResponse, error) {
|
||||
return c.beaconNodeValidatorClient.ProposeAttestation(ctx, in)
|
||||
return c.getClient().ProposeAttestation(ctx, in)
|
||||
}
|
||||
|
||||
func (c *grpcValidatorClient) ProposeAttestationElectra(ctx context.Context, in *ethpb.SingleAttestation) (*ethpb.AttestResponse, error) {
|
||||
return c.beaconNodeValidatorClient.ProposeAttestationElectra(ctx, in)
|
||||
return c.getClient().ProposeAttestationElectra(ctx, in)
|
||||
}
|
||||
|
||||
func (c *grpcValidatorClient) ProposeBeaconBlock(ctx context.Context, in *ethpb.GenericSignedBeaconBlock) (*ethpb.ProposeResponse, error) {
|
||||
return c.beaconNodeValidatorClient.ProposeBeaconBlock(ctx, in)
|
||||
return c.getClient().ProposeBeaconBlock(ctx, in)
|
||||
}
|
||||
|
||||
func (c *grpcValidatorClient) ProposeExit(ctx context.Context, in *ethpb.SignedVoluntaryExit) (*ethpb.ProposeExitResponse, error) {
|
||||
return c.beaconNodeValidatorClient.ProposeExit(ctx, in)
|
||||
return c.getClient().ProposeExit(ctx, in)
|
||||
}
|
||||
|
||||
func (c *grpcValidatorClient) StreamBlocksAltair(ctx context.Context, in *ethpb.StreamBlocksRequest) (ethpb.BeaconNodeValidator_StreamBlocksAltairClient, error) {
|
||||
return c.beaconNodeValidatorClient.StreamBlocksAltair(ctx, in)
|
||||
return c.getClient().StreamBlocksAltair(ctx, in)
|
||||
}
|
||||
|
||||
func (c *grpcValidatorClient) SubmitAggregateSelectionProof(ctx context.Context, in *ethpb.AggregateSelectionRequest, _ primitives.ValidatorIndex, _ uint64) (*ethpb.AggregateSelectionResponse, error) {
|
||||
return c.beaconNodeValidatorClient.SubmitAggregateSelectionProof(ctx, in)
|
||||
return c.getClient().SubmitAggregateSelectionProof(ctx, in)
|
||||
}
|
||||
|
||||
func (c *grpcValidatorClient) SubmitAggregateSelectionProofElectra(ctx context.Context, in *ethpb.AggregateSelectionRequest, _ primitives.ValidatorIndex, _ uint64) (*ethpb.AggregateSelectionElectraResponse, error) {
|
||||
return c.beaconNodeValidatorClient.SubmitAggregateSelectionProofElectra(ctx, in)
|
||||
return c.getClient().SubmitAggregateSelectionProofElectra(ctx, in)
|
||||
}
|
||||
|
||||
func (c *grpcValidatorClient) SubmitSignedAggregateSelectionProof(ctx context.Context, in *ethpb.SignedAggregateSubmitRequest) (*ethpb.SignedAggregateSubmitResponse, error) {
|
||||
return c.beaconNodeValidatorClient.SubmitSignedAggregateSelectionProof(ctx, in)
|
||||
return c.getClient().SubmitSignedAggregateSelectionProof(ctx, in)
|
||||
}
|
||||
|
||||
func (c *grpcValidatorClient) SubmitSignedAggregateSelectionProofElectra(ctx context.Context, in *ethpb.SignedAggregateSubmitElectraRequest) (*ethpb.SignedAggregateSubmitResponse, error) {
|
||||
return c.beaconNodeValidatorClient.SubmitSignedAggregateSelectionProofElectra(ctx, in)
|
||||
return c.getClient().SubmitSignedAggregateSelectionProofElectra(ctx, in)
|
||||
}
|
||||
|
||||
func (c *grpcValidatorClient) SubmitSignedContributionAndProof(ctx context.Context, in *ethpb.SignedContributionAndProof) (*empty.Empty, error) {
|
||||
return c.beaconNodeValidatorClient.SubmitSignedContributionAndProof(ctx, in)
|
||||
return c.getClient().SubmitSignedContributionAndProof(ctx, in)
|
||||
}
|
||||
|
||||
func (c *grpcValidatorClient) SubmitSyncMessage(ctx context.Context, in *ethpb.SyncCommitteeMessage) (*empty.Empty, error) {
|
||||
return c.beaconNodeValidatorClient.SubmitSyncMessage(ctx, in)
|
||||
return c.getClient().SubmitSyncMessage(ctx, in)
|
||||
}
|
||||
|
||||
func (c *grpcValidatorClient) SubmitValidatorRegistrations(ctx context.Context, in *ethpb.SignedValidatorRegistrationsV1) (*empty.Empty, error) {
|
||||
return c.beaconNodeValidatorClient.SubmitValidatorRegistrations(ctx, in)
|
||||
return c.getClient().SubmitValidatorRegistrations(ctx, in)
|
||||
}
|
||||
|
||||
func (c *grpcValidatorClient) SubscribeCommitteeSubnets(ctx context.Context, in *ethpb.CommitteeSubnetsSubscribeRequest, _ []*ethpb.ValidatorDuty) (*empty.Empty, error) {
|
||||
return c.beaconNodeValidatorClient.SubscribeCommitteeSubnets(ctx, in)
|
||||
return c.getClient().SubscribeCommitteeSubnets(ctx, in)
|
||||
}
|
||||
|
||||
func (c *grpcValidatorClient) ValidatorIndex(ctx context.Context, in *ethpb.ValidatorIndexRequest) (*ethpb.ValidatorIndexResponse, error) {
|
||||
return c.beaconNodeValidatorClient.ValidatorIndex(ctx, in)
|
||||
return c.getClient().ValidatorIndex(ctx, in)
|
||||
}
|
||||
|
||||
func (c *grpcValidatorClient) ValidatorStatus(ctx context.Context, in *ethpb.ValidatorStatusRequest) (*ethpb.ValidatorStatusResponse, error) {
|
||||
return c.beaconNodeValidatorClient.ValidatorStatus(ctx, in)
|
||||
return c.getClient().ValidatorStatus(ctx, in)
|
||||
}
|
||||
|
||||
// Deprecated: Do not use.
|
||||
func (c *grpcValidatorClient) WaitForChainStart(ctx context.Context, in *empty.Empty) (*ethpb.ChainStartResponse, error) {
|
||||
stream, err := c.beaconNodeValidatorClient.WaitForChainStart(ctx, in)
|
||||
stream, err := c.getClient().WaitForChainStart(ctx, in)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(
|
||||
client.ErrConnectionIssue,
|
||||
@@ -260,13 +260,13 @@ func (c *grpcValidatorClient) WaitForChainStart(ctx context.Context, in *empty.E
|
||||
}
|
||||
|
||||
func (c *grpcValidatorClient) AssignValidatorToSubnet(ctx context.Context, in *ethpb.AssignValidatorToSubnetRequest) (*empty.Empty, error) {
|
||||
return c.beaconNodeValidatorClient.AssignValidatorToSubnet(ctx, in)
|
||||
return c.getClient().AssignValidatorToSubnet(ctx, in)
|
||||
}
|
||||
func (c *grpcValidatorClient) AggregatedSigAndAggregationBits(
|
||||
ctx context.Context,
|
||||
in *ethpb.AggregatedSigAndAggregationBitsRequest,
|
||||
) (*ethpb.AggregatedSigAndAggregationBitsResponse, error) {
|
||||
return c.beaconNodeValidatorClient.AggregatedSigAndAggregationBits(ctx, in)
|
||||
return c.getClient().AggregatedSigAndAggregationBits(ctx, in)
|
||||
}
|
||||
|
||||
func (*grpcValidatorClient) AggregatedSelections(context.Context, []iface.BeaconCommitteeSelection) ([]iface.BeaconCommitteeSelection, error) {
|
||||
@@ -277,8 +277,12 @@ func (*grpcValidatorClient) AggregatedSyncSelections(context.Context, []iface.Sy
|
||||
return nil, iface.ErrNotSupported
|
||||
}
|
||||
|
||||
func NewGrpcValidatorClient(cc grpc.ClientConnInterface) iface.ValidatorClient {
|
||||
return &grpcValidatorClient{ethpb.NewBeaconNodeValidatorClient(cc), false}
|
||||
// NewGrpcValidatorClientWithConnection creates a new gRPC validator client that supports
|
||||
// dynamic connection switching via the NodeConnection's GrpcConnectionProvider.
|
||||
func NewGrpcValidatorClientWithConnection(conn validatorHelpers.NodeConnection) iface.ValidatorClient {
|
||||
return &grpcValidatorClient{
|
||||
grpcClientManager: newGrpcClientManager(conn, ethpb.NewBeaconNodeValidatorClient),
|
||||
}
|
||||
}
|
||||
|
||||
func (c *grpcValidatorClient) StartEventStream(ctx context.Context, topics []string, eventsChannel chan<- *eventClient.Event) {
|
||||
@@ -308,7 +312,7 @@ func (c *grpcValidatorClient) StartEventStream(ctx context.Context, topics []str
|
||||
log.Warn("gRPC only supports the head topic, other topics will be ignored")
|
||||
}
|
||||
|
||||
stream, err := c.beaconNodeValidatorClient.StreamSlots(ctx, ðpb.StreamSlotsRequest{VerifiedOnly: true})
|
||||
stream, err := c.getClient().StreamSlots(ctx, ðpb.StreamSlotsRequest{VerifiedOnly: true})
|
||||
if err != nil {
|
||||
eventsChannel <- &eventClient.Event{
|
||||
EventType: eventClient.EventConnectionError,
|
||||
@@ -374,11 +378,29 @@ func (c *grpcValidatorClient) EventStreamIsRunning() bool {
|
||||
return c.isEventStreamRunning
|
||||
}
|
||||
|
||||
func (*grpcValidatorClient) Host() string {
|
||||
log.Warn(iface.ErrNotSupported)
|
||||
return ""
|
||||
func (c *grpcValidatorClient) Host() string {
|
||||
if c.grpcClientManager == nil || c.grpcClientManager.conn == nil || c.grpcClientManager.conn.GetGrpcConnectionProvider() == nil {
|
||||
return ""
|
||||
}
|
||||
return c.grpcClientManager.conn.GetGrpcConnectionProvider().CurrentHost()
|
||||
}
|
||||
|
||||
func (*grpcValidatorClient) SetHost(_ string) {
|
||||
log.Warn(iface.ErrNotSupported)
|
||||
func (c *grpcValidatorClient) SetHost(host string) {
|
||||
if c.grpcClientManager == nil || c.grpcClientManager.conn == nil {
|
||||
return
|
||||
}
|
||||
provider := c.grpcClientManager.conn.GetGrpcConnectionProvider()
|
||||
if provider == nil {
|
||||
return
|
||||
}
|
||||
// Find the index of the requested host and switch to it
|
||||
for i, h := range provider.Hosts() {
|
||||
if h == host {
|
||||
if err := provider.SetHost(i); err != nil {
|
||||
log.WithError(err).WithField("host", host).Error("Failed to set gRPC host")
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
log.WithField("host", host).Warn("Requested gRPC host not found in configured endpoints")
|
||||
}
|
||||
|
||||
@@ -133,7 +133,12 @@ func TestWaitForChainStart_StreamSetupFails(t *testing.T) {
|
||||
gomock.Any(),
|
||||
).Return(nil, errors.New("failed stream"))
|
||||
|
||||
validatorClient := &grpcValidatorClient{beaconNodeValidatorClient, true}
|
||||
validatorClient := &grpcValidatorClient{
|
||||
grpcClientManager: &grpcClientManager[eth.BeaconNodeValidatorClient]{
|
||||
client: beaconNodeValidatorClient,
|
||||
},
|
||||
isEventStreamRunning: true,
|
||||
}
|
||||
_, err := validatorClient.WaitForChainStart(t.Context(), &emptypb.Empty{})
|
||||
want := "could not setup beacon chain ChainStart streaming client"
|
||||
assert.ErrorContains(t, want, err)
|
||||
@@ -146,7 +151,12 @@ func TestStartEventStream(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
beaconNodeValidatorClient := mock2.NewMockBeaconNodeValidatorClient(ctrl)
|
||||
grpcClient := &grpcValidatorClient{beaconNodeValidatorClient, true}
|
||||
grpcClient := &grpcValidatorClient{
|
||||
grpcClientManager: &grpcClientManager[eth.BeaconNodeValidatorClient]{
|
||||
client: beaconNodeValidatorClient,
|
||||
},
|
||||
isEventStreamRunning: true,
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
topics []string
|
||||
|
||||
@@ -9,10 +9,9 @@ import (
|
||||
)
|
||||
|
||||
func NewNodeClient(validatorConn validatorHelpers.NodeConnection, jsonRestHandler beaconApi.RestHandler) iface.NodeClient {
|
||||
grpcClient := grpcApi.NewNodeClient(validatorConn.GetGrpcClientConn())
|
||||
grpcClient := grpcApi.NewNodeClientWithConnection(validatorConn)
|
||||
if features.Get().EnableBeaconRESTApi {
|
||||
return beaconApi.NewNodeClientWithFallback(jsonRestHandler, grpcClient)
|
||||
} else {
|
||||
return grpcClient
|
||||
}
|
||||
return grpcClient
|
||||
}
|
||||
|
||||
@@ -134,18 +134,34 @@ func NewValidatorService(ctx context.Context, cfg *Config) (*ValidatorService, e
|
||||
|
||||
s.ctx = grpcutil.AppendHeaders(ctx, cfg.GRPCHeaders)
|
||||
|
||||
grpcConn, err := grpc.DialContext(ctx, cfg.BeaconNodeGRPCEndpoint, dialOpts...)
|
||||
if err != nil {
|
||||
return s, err
|
||||
var grpcConn *grpc.ClientConn
|
||||
var grpcProvider validatorHelpers.GrpcConnectionProvider
|
||||
|
||||
if cfg.BeaconNodeGRPCEndpoint != "" {
|
||||
var err error
|
||||
grpcProvider, err = validatorHelpers.NewGrpcConnectionProvider(ctx, cfg.BeaconNodeGRPCEndpoint, dialOpts)
|
||||
if err != nil {
|
||||
return s, errors.Wrap(err, "failed to create gRPC connection provider")
|
||||
}
|
||||
grpcConn = grpcProvider.CurrentConn()
|
||||
}
|
||||
|
||||
if cfg.BeaconNodeCert != "" {
|
||||
log.Info("Established secure gRPC connection")
|
||||
}
|
||||
|
||||
connOpts := []validatorHelpers.NodeConnectionOption{
|
||||
validatorHelpers.WithBeaconApiHeaders(cfg.BeaconApiHeaders),
|
||||
validatorHelpers.WithBeaconApiTimeout(cfg.BeaconApiTimeout),
|
||||
}
|
||||
if grpcProvider != nil {
|
||||
connOpts = append(connOpts, validatorHelpers.WithGrpcConnectionProvider(grpcProvider))
|
||||
}
|
||||
|
||||
s.conn = validatorHelpers.NewNodeConnection(
|
||||
grpcConn,
|
||||
cfg.BeaconApiEndpoint,
|
||||
validatorHelpers.WithBeaconApiHeaders(cfg.BeaconApiHeaders),
|
||||
validatorHelpers.WithBeaconApiTimeout(cfg.BeaconApiTimeout),
|
||||
connOpts...,
|
||||
)
|
||||
|
||||
return s, nil
|
||||
@@ -210,6 +226,7 @@ func (v *ValidatorService) Start() {
|
||||
graffitiOrderedIndex: graffitiOrderedIndex,
|
||||
beaconNodeHosts: hosts,
|
||||
currentHostIndex: 0,
|
||||
grpcConnectionProvider: v.conn.GetGrpcConnectionProvider(),
|
||||
validatorClient: validatorClient,
|
||||
chainClient: beaconChainClientFactory.NewChainClient(v.conn, restHandler),
|
||||
nodeClient: nodeclientfactory.NewNodeClient(v.conn, restHandler),
|
||||
|
||||
@@ -15,7 +15,6 @@ func NewValidatorClient(
|
||||
) iface.ValidatorClient {
|
||||
if features.Get().EnableBeaconRESTApi {
|
||||
return beaconApi.NewBeaconApiValidatorClient(jsonRestHandler, opt...)
|
||||
} else {
|
||||
return grpcApi.NewGrpcValidatorClient(validatorConn.GetGrpcClientConn())
|
||||
}
|
||||
return grpcApi.NewGrpcValidatorClientWithConnection(validatorConn)
|
||||
}
|
||||
|
||||
@@ -38,6 +38,7 @@ import (
|
||||
"github.com/OffchainLabs/prysm/v7/validator/db"
|
||||
dbCommon "github.com/OffchainLabs/prysm/v7/validator/db/common"
|
||||
"github.com/OffchainLabs/prysm/v7/validator/graffiti"
|
||||
validatorHelpers "github.com/OffchainLabs/prysm/v7/validator/helpers"
|
||||
"github.com/OffchainLabs/prysm/v7/validator/keymanager"
|
||||
"github.com/OffchainLabs/prysm/v7/validator/keymanager/local"
|
||||
remoteweb3signer "github.com/OffchainLabs/prysm/v7/validator/keymanager/remote-web3signer"
|
||||
@@ -82,6 +83,7 @@ type validator struct {
|
||||
graffitiOrderedIndex uint64
|
||||
beaconNodeHosts []string
|
||||
currentHostIndex uint64
|
||||
grpcConnectionProvider validatorHelpers.GrpcConnectionProvider
|
||||
validatorClient iface.ValidatorClient
|
||||
chainClient iface.ChainClient
|
||||
nodeClient iface.NodeClient
|
||||
@@ -1261,15 +1263,35 @@ func (v *validator) Host() string {
|
||||
}
|
||||
|
||||
func (v *validator) changeHost() {
|
||||
next := (v.currentHostIndex + 1) % uint64(len(v.beaconNodeHosts))
|
||||
hosts := v.hosts()
|
||||
if len(hosts) <= 1 {
|
||||
return
|
||||
}
|
||||
next := (v.currentHostIndex + 1) % uint64(len(hosts))
|
||||
log.WithFields(logrus.Fields{
|
||||
"currentHost": v.beaconNodeHosts[v.currentHostIndex],
|
||||
"nextHost": v.beaconNodeHosts[next],
|
||||
"currentHost": hosts[v.currentHostIndex],
|
||||
"nextHost": hosts[next],
|
||||
}).Warn("Beacon node is not responding, switching host")
|
||||
v.validatorClient.SetHost(v.beaconNodeHosts[next])
|
||||
v.validatorClient.SetHost(hosts[next])
|
||||
v.currentHostIndex = next
|
||||
}
|
||||
|
||||
// hosts returns the list of configured beacon node hosts for failover.
|
||||
func (v *validator) hosts() []string {
|
||||
if features.Get().EnableBeaconRESTApi {
|
||||
return v.beaconNodeHosts
|
||||
}
|
||||
if v.grpcConnectionProvider != nil {
|
||||
return v.grpcConnectionProvider.Hosts()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// numHosts returns the number of configured beacon node hosts for failover.
|
||||
func (v *validator) numHosts() int {
|
||||
return len(v.hosts())
|
||||
}
|
||||
|
||||
func (v *validator) FindHealthyHost(ctx context.Context) bool {
|
||||
// Tail-recursive closure keeps retry count private.
|
||||
var check func(remaining int) bool
|
||||
@@ -1277,18 +1299,20 @@ func (v *validator) FindHealthyHost(ctx context.Context) bool {
|
||||
if v.nodeClient.IsReady(ctx) { // ready → done
|
||||
return true
|
||||
}
|
||||
if len(v.beaconNodeHosts) == 1 && features.Get().EnableBeaconRESTApi {
|
||||
log.WithField("host", v.Host()).Warn("Beacon node is not responding, no backup node configured")
|
||||
return false
|
||||
log.WithField("host", v.Host()).Debug("Beacon node not fully synced")
|
||||
|
||||
// Try next host if not the last iteration
|
||||
if i < numHosts-1 {
|
||||
v.changeHost()
|
||||
}
|
||||
if remaining == 0 || !features.Get().EnableBeaconRESTApi {
|
||||
return false // exhausted or REST disabled
|
||||
}
|
||||
v.changeHost()
|
||||
return check(remaining - 1) // recurse
|
||||
}
|
||||
|
||||
return check(len(v.beaconNodeHosts))
|
||||
if numHosts == 1 {
|
||||
log.WithField("host", v.Host()).Warn("Beacon node is not fully synced, no backup node configured")
|
||||
} else {
|
||||
log.Warn("No fully synced beacon node found")
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (v *validator) filterAndCacheActiveKeys(ctx context.Context, pubkeys [][fieldparams.BLSPubkeyLength]byte, slot primitives.Slot) ([][fieldparams.BLSPubkeyLength]byte, error) {
|
||||
|
||||
@@ -2792,6 +2792,10 @@ func TestValidator_Host(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestValidator_ChangeHost(t *testing.T) {
|
||||
// Enable REST API mode for this test since changeHost only calls SetHost in REST API mode
|
||||
resetCfg := features.InitWithReset(&features.Flags{EnableBeaconRESTApi: true})
|
||||
defer resetCfg()
|
||||
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@ go_library(
|
||||
name = "go_default_library",
|
||||
srcs = [
|
||||
"converts.go",
|
||||
"grpc_connection_provider.go",
|
||||
"metadata.go",
|
||||
"node_connection.go",
|
||||
],
|
||||
@@ -15,6 +16,7 @@ go_library(
|
||||
"//validator/db/iface:go_default_library",
|
||||
"//validator/slashing-protection-history/format:go_default_library",
|
||||
"@com_github_pkg_errors//:go_default_library",
|
||||
"@com_github_sirupsen_logrus//:go_default_library",
|
||||
"@org_golang_google_grpc//:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
181
validator/helpers/grpc_connection_provider.go
Normal file
181
validator/helpers/grpc_connection_provider.go
Normal file
@@ -0,0 +1,181 @@
|
||||
package helpers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
pkgErrors "github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
var log = logrus.WithField("prefix", "helpers")
|
||||
|
||||
// GrpcConnectionProvider manages multiple gRPC connections for failover support.
|
||||
// It allows switching between different beacon node endpoints when the current one becomes unavailable.
|
||||
type GrpcConnectionProvider interface {
|
||||
// CurrentConn returns the currently active gRPC connection.
|
||||
// Returns nil if the provider has been closed.
|
||||
CurrentConn() *grpc.ClientConn
|
||||
// CurrentHost returns the address of the currently active endpoint.
|
||||
CurrentHost() string
|
||||
// Hosts returns all configured endpoint addresses.
|
||||
Hosts() []string
|
||||
// Conn returns the connection at the given index.
|
||||
Conn(index int) *grpc.ClientConn
|
||||
// SetHost switches to the endpoint at the given index.
|
||||
SetHost(index int) error
|
||||
// NextHost switches to the next available endpoint in round-robin fashion.
|
||||
NextHost()
|
||||
// Close closes all managed connections.
|
||||
Close() error
|
||||
}
|
||||
|
||||
type grpcConnectionProvider struct {
|
||||
// Immutable after construction - no lock needed for reads
|
||||
endpoints []string
|
||||
connections []*grpc.ClientConn
|
||||
|
||||
// Atomic index for lock-free current endpoint access
|
||||
currentIndex atomic.Uint64
|
||||
|
||||
// Mutex only for Close() and write operations that need log consistency
|
||||
mu sync.Mutex
|
||||
closed atomic.Bool
|
||||
}
|
||||
|
||||
// NewGrpcConnectionProvider creates a new connection provider that manages multiple gRPC connections.
|
||||
// The endpoint parameter can be a comma-separated list of addresses (e.g., "host1:4000,host2:4000").
|
||||
// It creates a separate connection for each endpoint using the provided dial options.
|
||||
func NewGrpcConnectionProvider(
|
||||
ctx context.Context,
|
||||
endpoint string,
|
||||
dialOpts []grpc.DialOption,
|
||||
) (GrpcConnectionProvider, error) {
|
||||
endpoints := parseEndpoints(endpoint)
|
||||
if len(endpoints) == 0 {
|
||||
return nil, pkgErrors.New("no gRPC endpoints provided")
|
||||
}
|
||||
|
||||
connections := make([]*grpc.ClientConn, 0, len(endpoints))
|
||||
for _, ep := range endpoints {
|
||||
conn, err := grpc.DialContext(ctx, ep, dialOpts...)
|
||||
if err != nil {
|
||||
// Clean up already created connections
|
||||
for _, c := range connections {
|
||||
if err := c.Close(); err != nil {
|
||||
log.WithError(err).Warn("Failed to close connection during cleanup")
|
||||
}
|
||||
}
|
||||
return nil, pkgErrors.Wrapf(err, "failed to connect to gRPC endpoint %s", ep)
|
||||
}
|
||||
connections = append(connections, conn)
|
||||
}
|
||||
|
||||
log.WithFields(logrus.Fields{
|
||||
"endpoints": endpoints,
|
||||
"count": len(endpoints),
|
||||
}).Info("Initialized gRPC connection provider with multiple endpoints")
|
||||
|
||||
return &grpcConnectionProvider{
|
||||
endpoints: endpoints,
|
||||
connections: connections,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// parseEndpoints splits a comma-separated endpoint string into individual endpoints.
|
||||
func parseEndpoints(endpoint string) []string {
|
||||
if endpoint == "" {
|
||||
return nil
|
||||
}
|
||||
var endpoints []string
|
||||
for p := range strings.SplitSeq(endpoint, ",") {
|
||||
if p = strings.TrimSpace(p); p != "" {
|
||||
endpoints = append(endpoints, p)
|
||||
}
|
||||
}
|
||||
return endpoints
|
||||
}
|
||||
|
||||
func (p *grpcConnectionProvider) CurrentConn() *grpc.ClientConn {
|
||||
if p.closed.Load() {
|
||||
return nil
|
||||
}
|
||||
idx := p.currentIndex.Load() % uint64(len(p.connections))
|
||||
return p.connections[idx]
|
||||
}
|
||||
|
||||
func (p *grpcConnectionProvider) CurrentHost() string {
|
||||
idx := p.currentIndex.Load() % uint64(len(p.endpoints))
|
||||
return p.endpoints[idx]
|
||||
}
|
||||
|
||||
func (p *grpcConnectionProvider) Hosts() []string {
|
||||
// Return a copy to maintain immutability
|
||||
hosts := make([]string, len(p.endpoints))
|
||||
copy(hosts, p.endpoints)
|
||||
return hosts
|
||||
}
|
||||
|
||||
func (p *grpcConnectionProvider) Conn(index int) *grpc.ClientConn {
|
||||
if p.closed.Load() {
|
||||
return nil
|
||||
}
|
||||
if index < 0 || index >= len(p.connections) {
|
||||
return nil
|
||||
}
|
||||
return p.connections[index]
|
||||
}
|
||||
|
||||
func (p *grpcConnectionProvider) SetHost(index int) error {
|
||||
if index < 0 || index >= len(p.endpoints) {
|
||||
return pkgErrors.Errorf("invalid host index %d, must be between 0 and %d", index, len(p.endpoints)-1)
|
||||
}
|
||||
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
oldIdx := p.currentIndex.Load()
|
||||
p.currentIndex.Store(uint64(index))
|
||||
|
||||
log.WithFields(logrus.Fields{
|
||||
"previousHost": p.endpoints[oldIdx%uint64(len(p.endpoints))],
|
||||
"newHost": p.endpoints[index],
|
||||
}).Info("Switched gRPC endpoint")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *grpcConnectionProvider) NextHost() {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
oldIdx := p.currentIndex.Load()
|
||||
newIdx := (oldIdx + 1) % uint64(len(p.endpoints))
|
||||
p.currentIndex.Store(newIdx)
|
||||
|
||||
log.WithFields(logrus.Fields{
|
||||
"previousHost": p.endpoints[oldIdx],
|
||||
"newHost": p.endpoints[newIdx],
|
||||
}).Debug("Switched to next gRPC endpoint")
|
||||
}
|
||||
|
||||
func (p *grpcConnectionProvider) Close() error {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
if p.closed.Load() {
|
||||
return nil
|
||||
}
|
||||
p.closed.Store(true)
|
||||
|
||||
var errs []error
|
||||
for i, conn := range p.connections {
|
||||
if err := conn.Close(); err != nil {
|
||||
errs = append(errs, pkgErrors.Wrapf(err, "failed to close connection to %s", p.endpoints[i]))
|
||||
}
|
||||
}
|
||||
return errors.Join(errs...)
|
||||
}
|
||||
@@ -14,14 +14,19 @@ type NodeConnection interface {
|
||||
setBeaconApiHeaders(map[string][]string)
|
||||
GetBeaconApiTimeout() time.Duration
|
||||
setBeaconApiTimeout(time.Duration)
|
||||
// GetGrpcConnectionProvider returns the gRPC connection provider for multi-endpoint support.
|
||||
// Returns nil if no provider is configured (single endpoint mode).
|
||||
GetGrpcConnectionProvider() GrpcConnectionProvider
|
||||
setGrpcConnectionProvider(GrpcConnectionProvider)
|
||||
dummy()
|
||||
}
|
||||
|
||||
type nodeConnection struct {
|
||||
grpcClientConn *grpc.ClientConn
|
||||
beaconApiUrl string
|
||||
beaconApiHeaders map[string][]string
|
||||
beaconApiTimeout time.Duration
|
||||
grpcClientConn *grpc.ClientConn
|
||||
grpcConnectionProvider GrpcConnectionProvider
|
||||
beaconApiUrl string
|
||||
beaconApiHeaders map[string][]string
|
||||
beaconApiTimeout time.Duration
|
||||
}
|
||||
|
||||
// NodeConnectionOption is a functional option for configuring the node connection.
|
||||
@@ -41,7 +46,18 @@ func WithBeaconApiTimeout(timeout time.Duration) NodeConnectionOption {
|
||||
}
|
||||
}
|
||||
|
||||
// WithGrpcConnectionProvider sets the gRPC connection provider for multi-endpoint support.
|
||||
func WithGrpcConnectionProvider(provider GrpcConnectionProvider) NodeConnectionOption {
|
||||
return func(nc NodeConnection) {
|
||||
nc.setGrpcConnectionProvider(provider)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *nodeConnection) GetGrpcClientConn() *grpc.ClientConn {
|
||||
// If a connection provider is configured, use its current connection
|
||||
if c.grpcConnectionProvider != nil {
|
||||
return c.grpcConnectionProvider.CurrentConn()
|
||||
}
|
||||
return c.grpcClientConn
|
||||
}
|
||||
|
||||
@@ -65,6 +81,14 @@ func (c *nodeConnection) setBeaconApiTimeout(timeout time.Duration) {
|
||||
c.beaconApiTimeout = timeout
|
||||
}
|
||||
|
||||
func (c *nodeConnection) GetGrpcConnectionProvider() GrpcConnectionProvider {
|
||||
return c.grpcConnectionProvider
|
||||
}
|
||||
|
||||
func (c *nodeConnection) setGrpcConnectionProvider(provider GrpcConnectionProvider) {
|
||||
c.grpcConnectionProvider = provider
|
||||
}
|
||||
|
||||
func (*nodeConnection) dummy() {}
|
||||
|
||||
func NewNodeConnection(grpcConn *grpc.ClientConn, beaconApiUrl string, opts ...NodeConnectionOption) NodeConnection {
|
||||
|
||||
Reference in New Issue
Block a user