mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 21:38:05 -05:00
Compare commits
3 Commits
att-time
...
sync-valid
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
897ea6412f | ||
|
|
8413660d5f | ||
|
|
e037491756 |
@@ -48,8 +48,13 @@ func (n *NodeHealthTracker) CheckHealth(ctx context.Context) bool {
|
||||
if isStatusChanged {
|
||||
// Update the health status
|
||||
n.isHealthy = &newStatus
|
||||
// Send the new status to the health channel
|
||||
n.healthChan <- newStatus
|
||||
// Send the new status to the health channel, potentially overwriting the existing value
|
||||
select {
|
||||
case <-n.healthChan:
|
||||
n.healthChan <- newStatus
|
||||
default:
|
||||
n.healthChan <- newStatus
|
||||
}
|
||||
}
|
||||
return newStatus
|
||||
}
|
||||
|
||||
@@ -87,12 +87,6 @@ func TestNodeHealth_Concurrency(t *testing.T) {
|
||||
// Number of goroutines to spawn for both reading and writing
|
||||
numGoroutines := 6
|
||||
|
||||
go func() {
|
||||
for range n.HealthUpdates() {
|
||||
// Consume values to avoid blocking on channel send.
|
||||
}
|
||||
}()
|
||||
|
||||
wg.Add(numGoroutines * 2) // for readers and writers
|
||||
|
||||
// Concurrently update health status
|
||||
|
||||
@@ -3,6 +3,7 @@ package testing
|
||||
import (
|
||||
"context"
|
||||
"reflect"
|
||||
"sync"
|
||||
|
||||
"github.com/prysmaticlabs/prysm/v5/api/client/beacon/iface"
|
||||
"go.uber.org/mock/gomock"
|
||||
@@ -16,6 +17,7 @@ var (
|
||||
type MockHealthClient struct {
|
||||
ctrl *gomock.Controller
|
||||
recorder *MockHealthClientMockRecorder
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
// MockHealthClientMockRecorder is the mock recorder for MockHealthClient.
|
||||
@@ -25,6 +27,8 @@ type MockHealthClientMockRecorder struct {
|
||||
|
||||
// IsHealthy mocks base method.
|
||||
func (m *MockHealthClient) IsHealthy(arg0 context.Context) bool {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "IsHealthy", arg0)
|
||||
ret0, ok := ret[0].(bool)
|
||||
@@ -41,6 +45,8 @@ func (m *MockHealthClient) EXPECT() *MockHealthClientMockRecorder {
|
||||
|
||||
// IsHealthy indicates an expected call of IsHealthy.
|
||||
func (mr *MockHealthClientMockRecorder) IsHealthy(arg0 any) *gomock.Call {
|
||||
mr.mock.Lock()
|
||||
defer mr.mock.Unlock()
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsHealthy", reflect.TypeOf((*MockHealthClient)(nil).IsHealthy), arg0)
|
||||
}
|
||||
|
||||
@@ -965,9 +965,8 @@ func (b *BeaconNode) registerRPCService(router *mux.Router) error {
|
||||
cert := b.cliCtx.String(flags.CertFlag.Name)
|
||||
key := b.cliCtx.String(flags.KeyFlag.Name)
|
||||
mockEth1DataVotes := b.cliCtx.Bool(flags.InteropMockEth1DataVotesFlag.Name)
|
||||
|
||||
maxMsgSize := b.cliCtx.Int(cmd.GrpcMaxCallRecvMsgSizeFlag.Name)
|
||||
enableDebugRPCEndpoints := b.cliCtx.Bool(flags.EnableDebugRPCEndpoints.Name)
|
||||
enableDebugRPCEndpoints := !b.cliCtx.Bool(flags.DisableDebugRPCEndpoints.Name)
|
||||
|
||||
p2pService := b.fetchP2P()
|
||||
rpcService := rpc.NewService(b.ctx, &rpc.Config{
|
||||
@@ -1056,11 +1055,10 @@ func (b *BeaconNode) registerGRPCGateway(router *mux.Router) error {
|
||||
gatewayPort := b.cliCtx.Int(flags.GRPCGatewayPort.Name)
|
||||
rpcHost := b.cliCtx.String(flags.RPCHost.Name)
|
||||
rpcPort := b.cliCtx.Int(flags.RPCPort.Name)
|
||||
|
||||
enableDebugRPCEndpoints := !b.cliCtx.Bool(flags.DisableDebugRPCEndpoints.Name)
|
||||
selfAddress := net.JoinHostPort(rpcHost, strconv.Itoa(rpcPort))
|
||||
gatewayAddress := net.JoinHostPort(gatewayHost, strconv.Itoa(gatewayPort))
|
||||
allowedOrigins := strings.Split(b.cliCtx.String(flags.GPRCGatewayCorsDomain.Name), ",")
|
||||
enableDebugRPCEndpoints := b.cliCtx.Bool(flags.EnableDebugRPCEndpoints.Name)
|
||||
selfCert := b.cliCtx.String(flags.CertFlag.Name)
|
||||
maxCallSize := b.cliCtx.Uint64(cmd.GrpcMaxCallRecvMsgSizeFlag.Name)
|
||||
httpModules := b.cliCtx.String(flags.HTTPModules.Name)
|
||||
|
||||
@@ -113,8 +113,8 @@ type Config struct {
|
||||
ExecutionChainInfoFetcher execution.ChainInfoFetcher
|
||||
GenesisTimeFetcher blockchain.TimeFetcher
|
||||
GenesisFetcher blockchain.GenesisFetcher
|
||||
EnableDebugRPCEndpoints bool
|
||||
MockEth1Votes bool
|
||||
EnableDebugRPCEndpoints bool
|
||||
AttestationsPool attestations.Pool
|
||||
ExitPool voluntaryexits.PoolManager
|
||||
SlashingsPool slashings.PoolManager
|
||||
@@ -318,7 +318,6 @@ func NewService(ctx context.Context, cfg *Config) *Service {
|
||||
ethpbv1alpha1.RegisterHealthServer(s.grpcServer, nodeServer)
|
||||
ethpbv1alpha1.RegisterBeaconChainServer(s.grpcServer, beaconChainServer)
|
||||
if s.cfg.EnableDebugRPCEndpoints {
|
||||
log.Info("Enabled debug gRPC endpoints")
|
||||
debugServer := &debugv1alpha1.Server{
|
||||
GenesisTimeFetcher: s.cfg.GenesisTimeFetcher,
|
||||
BeaconDB: s.cfg.BeaconDB,
|
||||
|
||||
@@ -3,6 +3,8 @@ package sync
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"runtime/pprof"
|
||||
"time"
|
||||
|
||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||
@@ -35,10 +37,47 @@ var (
|
||||
errRejectCommitmentLen = errors.New("[REJECT] The length of KZG commitments is less than or equal to the limitation defined in Consensus Layer")
|
||||
)
|
||||
|
||||
func (s *Service) validateBeaconBlockPubSub(ctx context.Context, pid peer.ID, msg *pubsub.Message) (pubsub.ValidationResult, error) {
|
||||
done := make(chan struct {
|
||||
result pubsub.ValidationResult
|
||||
err error
|
||||
})
|
||||
|
||||
go func() {
|
||||
result, err := func() (pubsub.ValidationResult, error) {
|
||||
return s.validateBeaconBlockPubSubInternal(ctx, pid, msg)
|
||||
}()
|
||||
done <- struct {
|
||||
result pubsub.ValidationResult
|
||||
err error
|
||||
}{result, err}
|
||||
}()
|
||||
|
||||
select {
|
||||
case res := <-done:
|
||||
return res.result, res.err
|
||||
case <-time.After(12 * time.Second):
|
||||
filePath := "/tmp/pre-confirm-blocks.goroutine"
|
||||
file, err := os.Create(filePath)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not create file for goroutine dump")
|
||||
}
|
||||
defer func() {
|
||||
if err := file.Close(); err != nil {
|
||||
log.WithError(err).Error("Could not close file for goroutine dump")
|
||||
}
|
||||
}()
|
||||
if err := pprof.Lookup("goroutine").WriteTo(file, 2); err != nil {
|
||||
log.WithError(err).Error("Could not write goroutine dump to file")
|
||||
}
|
||||
}
|
||||
return pubsub.ValidationIgnore, nil
|
||||
}
|
||||
|
||||
// validateBeaconBlockPubSub checks that the incoming block has a valid BLS signature.
|
||||
// Blocks that have already been seen are ignored. If the BLS signature is any valid signature,
|
||||
// this method rebroadcasts the message.
|
||||
func (s *Service) validateBeaconBlockPubSub(ctx context.Context, pid peer.ID, msg *pubsub.Message) (pubsub.ValidationResult, error) {
|
||||
func (s *Service) validateBeaconBlockPubSubInternal(ctx context.Context, pid peer.ID, msg *pubsub.Message) (pubsub.ValidationResult, error) {
|
||||
receivedTime := prysmTime.Now()
|
||||
// Validation runs on publish (not just subscriptions), so we should approve any message from
|
||||
// ourselves.
|
||||
|
||||
@@ -174,10 +174,10 @@ var (
|
||||
Usage: "The factor by which blob batch limit may increase on burst.",
|
||||
Value: 2,
|
||||
}
|
||||
// EnableDebugRPCEndpoints as /v1/beacon/state.
|
||||
EnableDebugRPCEndpoints = &cli.BoolFlag{
|
||||
Name: "enable-debug-rpc-endpoints",
|
||||
Usage: "Enables the debug rpc service, containing utility endpoints such as /eth/v1alpha1/beacon/state.",
|
||||
// DisableDebugRPCEndpoints disables the debug Beacon API namespace.
|
||||
DisableDebugRPCEndpoints = &cli.BoolFlag{
|
||||
Name: "disable-debug-rpc-endpoints",
|
||||
Usage: "Disables the debug Beacon API namespace.",
|
||||
}
|
||||
// SubscribeToAllSubnets defines a flag to specify whether to subscribe to all possible attestation/sync subnets or not.
|
||||
SubscribeToAllSubnets = &cli.BoolFlag{
|
||||
|
||||
@@ -64,7 +64,7 @@ var appFlags = []cli.Flag{
|
||||
flags.InteropNumValidatorsFlag,
|
||||
flags.InteropGenesisTimeFlag,
|
||||
flags.SlotsPerArchivedPoint,
|
||||
flags.EnableDebugRPCEndpoints,
|
||||
flags.DisableDebugRPCEndpoints,
|
||||
flags.SubscribeToAllSubnets,
|
||||
flags.HistoricalSlasherNode,
|
||||
flags.ChainID,
|
||||
|
||||
@@ -116,7 +116,7 @@ var appHelpFlagGroups = []flagGroup{
|
||||
flags.BlockBatchLimitBurstFactor,
|
||||
flags.BlobBatchLimit,
|
||||
flags.BlobBatchLimitBurstFactor,
|
||||
flags.EnableDebugRPCEndpoints,
|
||||
flags.DisableDebugRPCEndpoints,
|
||||
flags.SubscribeToAllSubnets,
|
||||
flags.HistoricalSlasherNode,
|
||||
flags.ChainID,
|
||||
|
||||
@@ -52,6 +52,11 @@ var (
|
||||
Usage: deprecatedUsage,
|
||||
Hidden: true,
|
||||
}
|
||||
deprecatedEnableDebugRPCEndpoints = &cli.BoolFlag{
|
||||
Name: "enable-debug-rpc-endpoints",
|
||||
Usage: deprecatedUsage,
|
||||
Hidden: true,
|
||||
}
|
||||
)
|
||||
|
||||
// Deprecated flags for both the beacon node and validator client.
|
||||
@@ -65,6 +70,7 @@ var deprecatedFlags = []cli.Flag{
|
||||
deprecatedEnableEIP4881,
|
||||
deprecatedDisableEIP4881,
|
||||
deprecatedVerboseSigVerification,
|
||||
deprecatedEnableDebugRPCEndpoints,
|
||||
}
|
||||
|
||||
// deprecatedBeaconFlags contains flags that are still used by other components
|
||||
|
||||
@@ -275,7 +275,6 @@ func (node *BeaconNode) Start(ctx context.Context) error {
|
||||
"--" + cmdshared.ValidatorMonitorIndicesFlag.Name + "=2",
|
||||
"--" + cmdshared.ForceClearDB.Name,
|
||||
"--" + cmdshared.AcceptTosFlag.Name,
|
||||
"--" + flags.EnableDebugRPCEndpoints.Name,
|
||||
"--" + features.EnableQUIC.Name,
|
||||
}
|
||||
if config.UsePprof {
|
||||
|
||||
Reference in New Issue
Block a user