Compare commits

...

3 Commits

Author SHA1 Message Date
terence tsao
897ea6412f Add profile to sync block path 2024-06-10 09:27:53 -07:00
Radosław Kapka
8413660d5f Keep only the latest value in the health channel (#14087)
* Increase health tracker channel buffer size

* keep only the latest value

* Make health test blocking as a regression test for PR #14807

* Fix new race conditions in the MockHealthClient

---------

Co-authored-by: Preston Van Loon <preston@pvl.dev>
2024-06-06 18:45:35 +00:00
Radosław Kapka
e037491756 Deprectate EnableDebugRPCEndpoints flag (#14015)
* Deprectate `EnableDebugRPCEndpoints` flag

* test fix

* add flag to deprecated list

* disable by default

* test fixes
2024-06-05 12:04:21 +00:00
11 changed files with 68 additions and 22 deletions

View File

@@ -48,8 +48,13 @@ func (n *NodeHealthTracker) CheckHealth(ctx context.Context) bool {
if isStatusChanged {
// Update the health status
n.isHealthy = &newStatus
// Send the new status to the health channel
n.healthChan <- newStatus
// Send the new status to the health channel, potentially overwriting the existing value
select {
case <-n.healthChan:
n.healthChan <- newStatus
default:
n.healthChan <- newStatus
}
}
return newStatus
}

View File

@@ -87,12 +87,6 @@ func TestNodeHealth_Concurrency(t *testing.T) {
// Number of goroutines to spawn for both reading and writing
numGoroutines := 6
go func() {
for range n.HealthUpdates() {
// Consume values to avoid blocking on channel send.
}
}()
wg.Add(numGoroutines * 2) // for readers and writers
// Concurrently update health status

View File

@@ -3,6 +3,7 @@ package testing
import (
"context"
"reflect"
"sync"
"github.com/prysmaticlabs/prysm/v5/api/client/beacon/iface"
"go.uber.org/mock/gomock"
@@ -16,6 +17,7 @@ var (
type MockHealthClient struct {
ctrl *gomock.Controller
recorder *MockHealthClientMockRecorder
sync.Mutex
}
// MockHealthClientMockRecorder is the mock recorder for MockHealthClient.
@@ -25,6 +27,8 @@ type MockHealthClientMockRecorder struct {
// IsHealthy mocks base method.
func (m *MockHealthClient) IsHealthy(arg0 context.Context) bool {
m.Lock()
defer m.Unlock()
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "IsHealthy", arg0)
ret0, ok := ret[0].(bool)
@@ -41,6 +45,8 @@ func (m *MockHealthClient) EXPECT() *MockHealthClientMockRecorder {
// IsHealthy indicates an expected call of IsHealthy.
func (mr *MockHealthClientMockRecorder) IsHealthy(arg0 any) *gomock.Call {
mr.mock.Lock()
defer mr.mock.Unlock()
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsHealthy", reflect.TypeOf((*MockHealthClient)(nil).IsHealthy), arg0)
}

View File

@@ -965,9 +965,8 @@ func (b *BeaconNode) registerRPCService(router *mux.Router) error {
cert := b.cliCtx.String(flags.CertFlag.Name)
key := b.cliCtx.String(flags.KeyFlag.Name)
mockEth1DataVotes := b.cliCtx.Bool(flags.InteropMockEth1DataVotesFlag.Name)
maxMsgSize := b.cliCtx.Int(cmd.GrpcMaxCallRecvMsgSizeFlag.Name)
enableDebugRPCEndpoints := b.cliCtx.Bool(flags.EnableDebugRPCEndpoints.Name)
enableDebugRPCEndpoints := !b.cliCtx.Bool(flags.DisableDebugRPCEndpoints.Name)
p2pService := b.fetchP2P()
rpcService := rpc.NewService(b.ctx, &rpc.Config{
@@ -1056,11 +1055,10 @@ func (b *BeaconNode) registerGRPCGateway(router *mux.Router) error {
gatewayPort := b.cliCtx.Int(flags.GRPCGatewayPort.Name)
rpcHost := b.cliCtx.String(flags.RPCHost.Name)
rpcPort := b.cliCtx.Int(flags.RPCPort.Name)
enableDebugRPCEndpoints := !b.cliCtx.Bool(flags.DisableDebugRPCEndpoints.Name)
selfAddress := net.JoinHostPort(rpcHost, strconv.Itoa(rpcPort))
gatewayAddress := net.JoinHostPort(gatewayHost, strconv.Itoa(gatewayPort))
allowedOrigins := strings.Split(b.cliCtx.String(flags.GPRCGatewayCorsDomain.Name), ",")
enableDebugRPCEndpoints := b.cliCtx.Bool(flags.EnableDebugRPCEndpoints.Name)
selfCert := b.cliCtx.String(flags.CertFlag.Name)
maxCallSize := b.cliCtx.Uint64(cmd.GrpcMaxCallRecvMsgSizeFlag.Name)
httpModules := b.cliCtx.String(flags.HTTPModules.Name)

View File

@@ -113,8 +113,8 @@ type Config struct {
ExecutionChainInfoFetcher execution.ChainInfoFetcher
GenesisTimeFetcher blockchain.TimeFetcher
GenesisFetcher blockchain.GenesisFetcher
EnableDebugRPCEndpoints bool
MockEth1Votes bool
EnableDebugRPCEndpoints bool
AttestationsPool attestations.Pool
ExitPool voluntaryexits.PoolManager
SlashingsPool slashings.PoolManager
@@ -318,7 +318,6 @@ func NewService(ctx context.Context, cfg *Config) *Service {
ethpbv1alpha1.RegisterHealthServer(s.grpcServer, nodeServer)
ethpbv1alpha1.RegisterBeaconChainServer(s.grpcServer, beaconChainServer)
if s.cfg.EnableDebugRPCEndpoints {
log.Info("Enabled debug gRPC endpoints")
debugServer := &debugv1alpha1.Server{
GenesisTimeFetcher: s.cfg.GenesisTimeFetcher,
BeaconDB: s.cfg.BeaconDB,

View File

@@ -3,6 +3,8 @@ package sync
import (
"context"
"fmt"
"os"
"runtime/pprof"
"time"
pubsub "github.com/libp2p/go-libp2p-pubsub"
@@ -35,10 +37,47 @@ var (
errRejectCommitmentLen = errors.New("[REJECT] The length of KZG commitments is less than or equal to the limitation defined in Consensus Layer")
)
func (s *Service) validateBeaconBlockPubSub(ctx context.Context, pid peer.ID, msg *pubsub.Message) (pubsub.ValidationResult, error) {
done := make(chan struct {
result pubsub.ValidationResult
err error
})
go func() {
result, err := func() (pubsub.ValidationResult, error) {
return s.validateBeaconBlockPubSubInternal(ctx, pid, msg)
}()
done <- struct {
result pubsub.ValidationResult
err error
}{result, err}
}()
select {
case res := <-done:
return res.result, res.err
case <-time.After(12 * time.Second):
filePath := "/tmp/pre-confirm-blocks.goroutine"
file, err := os.Create(filePath)
if err != nil {
log.WithError(err).Error("Could not create file for goroutine dump")
}
defer func() {
if err := file.Close(); err != nil {
log.WithError(err).Error("Could not close file for goroutine dump")
}
}()
if err := pprof.Lookup("goroutine").WriteTo(file, 2); err != nil {
log.WithError(err).Error("Could not write goroutine dump to file")
}
}
return pubsub.ValidationIgnore, nil
}
// validateBeaconBlockPubSub checks that the incoming block has a valid BLS signature.
// Blocks that have already been seen are ignored. If the BLS signature is any valid signature,
// this method rebroadcasts the message.
func (s *Service) validateBeaconBlockPubSub(ctx context.Context, pid peer.ID, msg *pubsub.Message) (pubsub.ValidationResult, error) {
func (s *Service) validateBeaconBlockPubSubInternal(ctx context.Context, pid peer.ID, msg *pubsub.Message) (pubsub.ValidationResult, error) {
receivedTime := prysmTime.Now()
// Validation runs on publish (not just subscriptions), so we should approve any message from
// ourselves.

View File

@@ -174,10 +174,10 @@ var (
Usage: "The factor by which blob batch limit may increase on burst.",
Value: 2,
}
// EnableDebugRPCEndpoints as /v1/beacon/state.
EnableDebugRPCEndpoints = &cli.BoolFlag{
Name: "enable-debug-rpc-endpoints",
Usage: "Enables the debug rpc service, containing utility endpoints such as /eth/v1alpha1/beacon/state.",
// DisableDebugRPCEndpoints disables the debug Beacon API namespace.
DisableDebugRPCEndpoints = &cli.BoolFlag{
Name: "disable-debug-rpc-endpoints",
Usage: "Disables the debug Beacon API namespace.",
}
// SubscribeToAllSubnets defines a flag to specify whether to subscribe to all possible attestation/sync subnets or not.
SubscribeToAllSubnets = &cli.BoolFlag{

View File

@@ -64,7 +64,7 @@ var appFlags = []cli.Flag{
flags.InteropNumValidatorsFlag,
flags.InteropGenesisTimeFlag,
flags.SlotsPerArchivedPoint,
flags.EnableDebugRPCEndpoints,
flags.DisableDebugRPCEndpoints,
flags.SubscribeToAllSubnets,
flags.HistoricalSlasherNode,
flags.ChainID,

View File

@@ -116,7 +116,7 @@ var appHelpFlagGroups = []flagGroup{
flags.BlockBatchLimitBurstFactor,
flags.BlobBatchLimit,
flags.BlobBatchLimitBurstFactor,
flags.EnableDebugRPCEndpoints,
flags.DisableDebugRPCEndpoints,
flags.SubscribeToAllSubnets,
flags.HistoricalSlasherNode,
flags.ChainID,

View File

@@ -52,6 +52,11 @@ var (
Usage: deprecatedUsage,
Hidden: true,
}
deprecatedEnableDebugRPCEndpoints = &cli.BoolFlag{
Name: "enable-debug-rpc-endpoints",
Usage: deprecatedUsage,
Hidden: true,
}
)
// Deprecated flags for both the beacon node and validator client.
@@ -65,6 +70,7 @@ var deprecatedFlags = []cli.Flag{
deprecatedEnableEIP4881,
deprecatedDisableEIP4881,
deprecatedVerboseSigVerification,
deprecatedEnableDebugRPCEndpoints,
}
// deprecatedBeaconFlags contains flags that are still used by other components

View File

@@ -275,7 +275,6 @@ func (node *BeaconNode) Start(ctx context.Context) error {
"--" + cmdshared.ValidatorMonitorIndicesFlag.Name + "=2",
"--" + cmdshared.ForceClearDB.Name,
"--" + cmdshared.AcceptTosFlag.Name,
"--" + flags.EnableDebugRPCEndpoints.Name,
"--" + features.EnableQUIC.Name,
}
if config.UsePprof {