mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 13:28:01 -05:00
Compare commits
2 Commits
d929e1dcaa
...
fix-bid-ve
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0e0b6d49ae | ||
|
|
b63cd49834 |
@@ -81,6 +81,7 @@ go_library(
|
||||
"//crypto/rand:go_default_library",
|
||||
"//encoding/bytesutil:go_default_library",
|
||||
"//encoding/ssz:go_default_library",
|
||||
"//io/file:go_default_library",
|
||||
"//math:go_default_library",
|
||||
"//monitoring/tracing:go_default_library",
|
||||
"//monitoring/tracing/trace:go_default_library",
|
||||
|
||||
@@ -2,17 +2,26 @@ package validator
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"runtime/pprof"
|
||||
"time"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/helpers"
|
||||
coreTime "github.com/OffchainLabs/prysm/v6/beacon-chain/core/time"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/transition"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/rpc/core"
|
||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/state"
|
||||
"github.com/OffchainLabs/prysm/v6/config/features"
|
||||
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
|
||||
"github.com/OffchainLabs/prysm/v6/encoding/bytesutil"
|
||||
"github.com/OffchainLabs/prysm/v6/io/file"
|
||||
"github.com/OffchainLabs/prysm/v6/monitoring/tracing/trace"
|
||||
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
|
||||
"github.com/OffchainLabs/prysm/v6/time/slots"
|
||||
"github.com/sirupsen/logrus"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
@@ -32,7 +41,19 @@ func (vs *Server) GetDutiesV2(ctx context.Context, req *ethpb.DutiesRequest) (*e
|
||||
if vs.SyncChecker.Syncing() {
|
||||
return nil, status.Error(codes.Unavailable, "Syncing to latest head, not ready to respond")
|
||||
}
|
||||
return vs.dutiesv2(ctx, req)
|
||||
|
||||
start := time.Now()
|
||||
|
||||
// Start background profiling that will capture if this takes too long
|
||||
var profileCancel func()
|
||||
if features.Get().SlowDutiesProfile {
|
||||
profileCancel = vs.startSlowDutiesProfiler(start, len(req.PublicKeys), req.Epoch)
|
||||
defer profileCancel()
|
||||
}
|
||||
|
||||
resp, err := vs.dutiesv2(ctx, req)
|
||||
|
||||
return resp, err
|
||||
}
|
||||
|
||||
// Compute the validator duties from the head state's corresponding epoch
|
||||
@@ -314,3 +335,138 @@ func populateCommitteeFields(duty *ethpb.DutiesV2Response_Duty, la *helpers.Lite
|
||||
duty.ValidatorCommitteeIndex = la.ValidatorCommitteeIndex
|
||||
duty.AttesterSlot = la.AttesterSlot
|
||||
}
|
||||
|
||||
// startSlowDutiesProfiler starts background profiling that triggers after 2s
|
||||
// Returns a cancel function that should be called when the operation completes
|
||||
func (vs *Server) startSlowDutiesProfiler(startTime time.Time, numValidators int, epoch primitives.Epoch) func() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
go func() {
|
||||
// Wait for 2 seconds
|
||||
select {
|
||||
case <-time.After(2 * time.Second):
|
||||
// Operation is taking too long, start profiling
|
||||
vs.captureSlowDutiesProfile(startTime, numValidators, epoch, ctx)
|
||||
case <-ctx.Done():
|
||||
// Operation completed before 2s, no profiling needed
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
return cancel
|
||||
}
|
||||
|
||||
// captureSlowDutiesProfile captures CPU and mutex profiles when GetDutiesV2 is slow
|
||||
func (vs *Server) captureSlowDutiesProfile(startTime time.Time, numValidators int, epoch primitives.Epoch, ctx context.Context) {
|
||||
timestamp := time.Now().Format("20060102-150405")
|
||||
|
||||
// Get the datadir from the database path and create debug subdirectory
|
||||
// Cast to Database interface to access DatabasePath method
|
||||
dbWithPath, ok := vs.BeaconDB.(interface{ DatabasePath() string })
|
||||
if !ok {
|
||||
log.Error("Cannot access database path for profiling - database does not implement DatabasePath method")
|
||||
return
|
||||
}
|
||||
dbPath := dbWithPath.DatabasePath()
|
||||
profileDir := filepath.Join(filepath.Dir(dbPath), "debug")
|
||||
|
||||
// Create profile directory if it doesn't exist
|
||||
if err := file.MkdirAll(profileDir); err != nil {
|
||||
log.WithError(err).Warn("Failed to create profile directory")
|
||||
return
|
||||
}
|
||||
|
||||
currentDuration := time.Since(startTime)
|
||||
log.WithFields(logrus.Fields{
|
||||
"currentDuration": currentDuration,
|
||||
"numValidators": numValidators,
|
||||
"epoch": epoch,
|
||||
"profileDir": profileDir,
|
||||
}).Warn("GetDutiesV2 taking longer than 2s, capturing profiles")
|
||||
|
||||
// Start CPU profiling immediately
|
||||
cpuFile, err := os.Create(fmt.Sprintf("%s/cpu-duties-%s.prof", profileDir, timestamp))
|
||||
if err != nil {
|
||||
log.WithError(err).Warn("Failed to create CPU profile file")
|
||||
} else {
|
||||
if err := pprof.StartCPUProfile(cpuFile); err != nil {
|
||||
log.WithError(err).Warn("Failed to start CPU profile")
|
||||
if closeErr := cpuFile.Close(); closeErr != nil {
|
||||
log.WithError(closeErr).Warn("Failed to close CPU profile file")
|
||||
}
|
||||
} else {
|
||||
// Profile for up to 10 seconds or until context is cancelled
|
||||
go func() {
|
||||
defer func() {
|
||||
pprof.StopCPUProfile()
|
||||
if closeErr := cpuFile.Close(); closeErr != nil {
|
||||
log.WithError(closeErr).Warn("Failed to close CPU profile file")
|
||||
}
|
||||
log.WithField("file", cpuFile.Name()).Info("CPU profile captured")
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-time.After(10 * time.Second):
|
||||
// Stop profiling after 10s max
|
||||
case <-ctx.Done():
|
||||
// Stop profiling when operation completes
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
// Enable mutex profiling
|
||||
runtime.SetMutexProfileFraction(1)
|
||||
|
||||
// Capture snapshot profiles immediately
|
||||
vs.captureSnapshotProfiles(profileDir, timestamp)
|
||||
}
|
||||
|
||||
// captureSnapshotProfiles captures point-in-time profiles
|
||||
func (vs *Server) captureSnapshotProfiles(profileDir, timestamp string) {
|
||||
// Capture mutex profile
|
||||
mutexFile, err := os.Create(fmt.Sprintf("%s/mutex-duties-%s.prof", profileDir, timestamp))
|
||||
if err != nil {
|
||||
log.WithError(err).Warn("Failed to create mutex profile file")
|
||||
} else {
|
||||
if err := pprof.Lookup("mutex").WriteTo(mutexFile, 0); err != nil {
|
||||
log.WithError(err).Warn("Failed to write mutex profile")
|
||||
} else {
|
||||
log.WithField("file", mutexFile.Name()).Info("Mutex profile captured")
|
||||
}
|
||||
if closeErr := mutexFile.Close(); closeErr != nil {
|
||||
log.WithError(closeErr).Warn("Failed to close mutex profile file")
|
||||
}
|
||||
}
|
||||
|
||||
// Capture goroutine profile
|
||||
goroutineFile, err := os.Create(fmt.Sprintf("%s/goroutine-duties-%s.prof", profileDir, timestamp))
|
||||
if err != nil {
|
||||
log.WithError(err).Warn("Failed to create goroutine profile file")
|
||||
} else {
|
||||
if err := pprof.Lookup("goroutine").WriteTo(goroutineFile, 0); err != nil {
|
||||
log.WithError(err).Warn("Failed to write goroutine profile")
|
||||
} else {
|
||||
log.WithField("file", goroutineFile.Name()).Info("Goroutine profile captured")
|
||||
}
|
||||
if closeErr := goroutineFile.Close(); closeErr != nil {
|
||||
log.WithError(closeErr).Warn("Failed to close goroutine profile file")
|
||||
}
|
||||
}
|
||||
|
||||
// Capture heap profile
|
||||
heapFile, err := os.Create(fmt.Sprintf("%s/heap-duties-%s.prof", profileDir, timestamp))
|
||||
if err != nil {
|
||||
log.WithError(err).Warn("Failed to create heap profile file")
|
||||
} else {
|
||||
runtime.GC() // Force GC before heap profile
|
||||
if err := pprof.Lookup("heap").WriteTo(heapFile, 0); err != nil {
|
||||
log.WithError(err).Warn("Failed to write heap profile")
|
||||
} else {
|
||||
log.WithField("file", heapFile.Name()).Info("Heap profile captured")
|
||||
}
|
||||
if closeErr := heapFile.Close(); closeErr != nil {
|
||||
log.WithError(closeErr).Warn("Failed to close heap profile file")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -471,6 +471,11 @@ func isVersionCompatible(bidVersion, headBlockVersion int) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// Allow Capella bids for Bellatrix blocks - they have compatible payload formats
|
||||
if bidVersion == version.Capella && headBlockVersion == version.Bellatrix {
|
||||
return true
|
||||
}
|
||||
|
||||
// For all other cases, require exact version match
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -1378,6 +1378,12 @@ func TestIsVersionCompatible(t *testing.T) {
|
||||
headBlockVersion: version.Capella,
|
||||
want: false,
|
||||
},
|
||||
{
|
||||
name: "Capella bid with Bellatrix head block - Compatible",
|
||||
bidVersion: version.Capella,
|
||||
headBlockVersion: version.Bellatrix,
|
||||
want: true,
|
||||
},
|
||||
{
|
||||
name: "Phase0 bid with Altair head block - Not compatible",
|
||||
bidVersion: version.Phase0,
|
||||
|
||||
3
changelog/ttsao_add-capella-bellatrix-compatibility.md
Normal file
3
changelog/ttsao_add-capella-bellatrix-compatibility.md
Normal file
@@ -0,0 +1,3 @@
|
||||
### Fixed
|
||||
|
||||
- Add Capella-Bellatrix bid compatibility to proposer version check
|
||||
@@ -79,6 +79,8 @@ type Flags struct {
|
||||
SaveInvalidBlock bool // SaveInvalidBlock saves invalid block to temp.
|
||||
SaveInvalidBlob bool // SaveInvalidBlob saves invalid blob to temp.
|
||||
|
||||
SlowDutiesProfile bool // SlowDutiesProfile enables performance profiling when GetDutiesV2 is slow.
|
||||
|
||||
EnableDiscoveryReboot bool // EnableDiscoveryReboot allows the node to have its local listener to be rebooted in the event of discovery issues.
|
||||
|
||||
// KeystoreImportDebounceInterval specifies the time duration the validator waits to reload new keys if they have
|
||||
@@ -202,6 +204,11 @@ func ConfigureBeaconChain(ctx *cli.Context) error {
|
||||
cfg.SaveInvalidBlob = true
|
||||
}
|
||||
|
||||
if ctx.IsSet(slowDutiesProfileFlag.Name) {
|
||||
logEnabled(slowDutiesProfileFlag)
|
||||
cfg.SlowDutiesProfile = true
|
||||
}
|
||||
|
||||
if ctx.IsSet(disableGRPCConnectionLogging.Name) {
|
||||
logDisabled(disableGRPCConnectionLogging)
|
||||
cfg.DisableGRPCConnectionLogs = true
|
||||
|
||||
@@ -45,6 +45,10 @@ var (
|
||||
Name: "save-invalid-blob-temp",
|
||||
Usage: "Writes invalid blobs to temp directory.",
|
||||
}
|
||||
slowDutiesProfileFlag = &cli.BoolFlag{
|
||||
Name: "slow-duties-profile",
|
||||
Usage: "Enable performance profiling when GetDutiesV2 takes longer than 2s. Saves profiles to <datadir>/debug.",
|
||||
}
|
||||
disableGRPCConnectionLogging = &cli.BoolFlag{
|
||||
Name: "disable-grpc-connection-logging",
|
||||
Usage: `WARNING: The gRPC API will remain the default and fully supported through v8 (expected in 2026) but will be eventually removed in favor of REST API..
|
||||
@@ -232,6 +236,7 @@ var BeaconChainFlags = combinedFlags([]cli.Flag{
|
||||
writeSSZStateTransitionsFlag,
|
||||
saveInvalidBlockTempFlag,
|
||||
saveInvalidBlobTempFlag,
|
||||
slowDutiesProfileFlag,
|
||||
disableGRPCConnectionLogging,
|
||||
HoleskyTestnet,
|
||||
SepoliaTestnet,
|
||||
|
||||
Reference in New Issue
Block a user