mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-12 23:08:13 -05:00
Compare commits
7 Commits
dependent_
...
cache-atte
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b694cdc87b | ||
|
|
13f8e7b47f | ||
|
|
0d07559c82 | ||
|
|
51f8fb9703 | ||
|
|
124eadd56e | ||
|
|
edcf9e86c5 | ||
|
|
2e8d579d31 |
@@ -74,7 +74,6 @@ go_library(
|
||||
"//beacon-chain/state:go_default_library",
|
||||
"//beacon-chain/state/stategen:go_default_library",
|
||||
"//beacon-chain/verification:go_default_library",
|
||||
"//cmd/beacon-chain/flags:go_default_library",
|
||||
"//config/features:go_default_library",
|
||||
"//config/fieldparams:go_default_library",
|
||||
"//config/params:go_default_library",
|
||||
@@ -174,7 +173,6 @@ go_test(
|
||||
"//beacon-chain/state/state-native:go_default_library",
|
||||
"//beacon-chain/state/stategen:go_default_library",
|
||||
"//beacon-chain/verification:go_default_library",
|
||||
"//cmd/beacon-chain/flags:go_default_library",
|
||||
"//config/features:go_default_library",
|
||||
"//config/fieldparams:go_default_library",
|
||||
"//config/params:go_default_library",
|
||||
|
||||
@@ -14,7 +14,6 @@ import (
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/cache"
|
||||
statefeed "github.com/OffchainLabs/prysm/v7/beacon-chain/core/feed/state"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/helpers"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/peerdas"
|
||||
coreTime "github.com/OffchainLabs/prysm/v7/beacon-chain/core/time"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/transition"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/db"
|
||||
@@ -30,7 +29,6 @@ import (
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/startup"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/state"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/state/stategen"
|
||||
"github.com/OffchainLabs/prysm/v7/cmd/beacon-chain/flags"
|
||||
"github.com/OffchainLabs/prysm/v7/config/params"
|
||||
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
|
||||
"github.com/OffchainLabs/prysm/v7/consensus-types/interfaces"
|
||||
@@ -291,19 +289,6 @@ func (s *Service) StartFromSavedState(saved state.BeaconState) error {
|
||||
return errors.Wrap(err, "failed to initialize blockchain service")
|
||||
}
|
||||
|
||||
if !params.FuluEnabled() {
|
||||
return nil
|
||||
}
|
||||
|
||||
earliestAvailableSlot, custodySubnetCount, err := s.updateCustodyInfoInDB(saved.Slot())
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not get and save custody group count")
|
||||
}
|
||||
|
||||
if _, _, err := s.cfg.P2P.UpdateCustodyInfo(earliestAvailableSlot, custodySubnetCount); err != nil {
|
||||
return errors.Wrap(err, "update custody info")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -468,73 +453,6 @@ func (s *Service) removeStartupState() {
|
||||
s.cfg.FinalizedStateAtStartUp = nil
|
||||
}
|
||||
|
||||
// UpdateCustodyInfoInDB updates the custody information in the database.
|
||||
// It returns the (potentially updated) custody group count and the earliest available slot.
|
||||
func (s *Service) updateCustodyInfoInDB(slot primitives.Slot) (primitives.Slot, uint64, error) {
|
||||
isSupernode := flags.Get().Supernode
|
||||
isSemiSupernode := flags.Get().SemiSupernode
|
||||
|
||||
cfg := params.BeaconConfig()
|
||||
custodyRequirement := cfg.CustodyRequirement
|
||||
|
||||
// Check if the node was previously subscribed to all data subnets, and if so,
|
||||
// store the new status accordingly.
|
||||
wasSupernode, err := s.cfg.BeaconDB.UpdateSubscribedToAllDataSubnets(s.ctx, isSupernode)
|
||||
if err != nil {
|
||||
return 0, 0, errors.Wrap(err, "update subscribed to all data subnets")
|
||||
}
|
||||
|
||||
// Compute the target custody group count based on current flag configuration.
|
||||
targetCustodyGroupCount := custodyRequirement
|
||||
|
||||
// Supernode: custody all groups (either currently set or previously enabled)
|
||||
if isSupernode {
|
||||
targetCustodyGroupCount = cfg.NumberOfCustodyGroups
|
||||
}
|
||||
|
||||
// Semi-supernode: custody minimum needed for reconstruction, or custody requirement if higher
|
||||
if isSemiSupernode {
|
||||
semiSupernodeCustody, err := peerdas.MinimumCustodyGroupCountToReconstruct()
|
||||
if err != nil {
|
||||
return 0, 0, errors.Wrap(err, "minimum custody group count")
|
||||
}
|
||||
|
||||
targetCustodyGroupCount = max(custodyRequirement, semiSupernodeCustody)
|
||||
}
|
||||
|
||||
// Safely compute the fulu fork slot.
|
||||
fuluForkSlot, err := fuluForkSlot()
|
||||
if err != nil {
|
||||
return 0, 0, errors.Wrap(err, "fulu fork slot")
|
||||
}
|
||||
|
||||
// If slot is before the fulu fork slot, then use the earliest stored slot as the reference slot.
|
||||
if slot < fuluForkSlot {
|
||||
slot, err = s.cfg.BeaconDB.EarliestSlot(s.ctx)
|
||||
if err != nil {
|
||||
return 0, 0, errors.Wrap(err, "earliest slot")
|
||||
}
|
||||
}
|
||||
|
||||
earliestAvailableSlot, actualCustodyGroupCount, err := s.cfg.BeaconDB.UpdateCustodyInfo(s.ctx, slot, targetCustodyGroupCount)
|
||||
if err != nil {
|
||||
return 0, 0, errors.Wrap(err, "update custody info")
|
||||
}
|
||||
|
||||
if isSupernode {
|
||||
log.WithFields(logrus.Fields{
|
||||
"current": actualCustodyGroupCount,
|
||||
"target": cfg.NumberOfCustodyGroups,
|
||||
}).Info("Supernode mode enabled. Will custody all data columns going forward.")
|
||||
}
|
||||
|
||||
if wasSupernode && !isSupernode {
|
||||
log.Warningf("Because the `--%s` flag was previously used, the node will continue to act as a super node.", flags.Supernode.Name)
|
||||
}
|
||||
|
||||
return earliestAvailableSlot, actualCustodyGroupCount, nil
|
||||
}
|
||||
|
||||
func spawnCountdownIfPreGenesis(ctx context.Context, genesisTime time.Time, db db.HeadAccessDatabase) {
|
||||
currentTime := prysmTime.Now()
|
||||
if currentTime.After(genesisTime) {
|
||||
@@ -551,19 +469,3 @@ func spawnCountdownIfPreGenesis(ctx context.Context, genesisTime time.Time, db d
|
||||
}
|
||||
go slots.CountdownToGenesis(ctx, genesisTime, uint64(gState.NumValidators()), gRoot)
|
||||
}
|
||||
|
||||
func fuluForkSlot() (primitives.Slot, error) {
|
||||
cfg := params.BeaconConfig()
|
||||
|
||||
fuluForkEpoch := cfg.FuluForkEpoch
|
||||
if fuluForkEpoch == cfg.FarFutureEpoch {
|
||||
return cfg.FarFutureSlot, nil
|
||||
}
|
||||
|
||||
forkFuluSlot, err := slots.EpochStart(fuluForkEpoch)
|
||||
if err != nil {
|
||||
return 0, errors.Wrap(err, "epoch start")
|
||||
}
|
||||
|
||||
return forkFuluSlot, nil
|
||||
}
|
||||
|
||||
@@ -23,11 +23,9 @@ import (
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/startup"
|
||||
state_native "github.com/OffchainLabs/prysm/v7/beacon-chain/state/state-native"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/state/stategen"
|
||||
"github.com/OffchainLabs/prysm/v7/cmd/beacon-chain/flags"
|
||||
"github.com/OffchainLabs/prysm/v7/config/features"
|
||||
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
|
||||
"github.com/OffchainLabs/prysm/v7/config/params"
|
||||
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
|
||||
consensusblocks "github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
|
||||
"github.com/OffchainLabs/prysm/v7/consensus-types/interfaces"
|
||||
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
|
||||
@@ -596,218 +594,3 @@ func TestNotifyIndex(t *testing.T) {
|
||||
t.Errorf("Notifier channel did not receive the index")
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpdateCustodyInfoInDB(t *testing.T) {
|
||||
const (
|
||||
fuluForkEpoch = 10
|
||||
custodyRequirement = uint64(4)
|
||||
earliestStoredSlot = primitives.Slot(12)
|
||||
numberOfCustodyGroups = uint64(64)
|
||||
)
|
||||
|
||||
params.SetupTestConfigCleanup(t)
|
||||
cfg := params.BeaconConfig()
|
||||
cfg.FuluForkEpoch = fuluForkEpoch
|
||||
cfg.CustodyRequirement = custodyRequirement
|
||||
cfg.NumberOfCustodyGroups = numberOfCustodyGroups
|
||||
params.OverrideBeaconConfig(cfg)
|
||||
|
||||
ctx := t.Context()
|
||||
pbBlock := util.NewBeaconBlock()
|
||||
pbBlock.Block.Slot = 12
|
||||
signedBeaconBlock, err := blocks.NewSignedBeaconBlock(pbBlock)
|
||||
require.NoError(t, err)
|
||||
|
||||
roBlock, err := blocks.NewROBlock(signedBeaconBlock)
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Run("CGC increases before fulu", func(t *testing.T) {
|
||||
service, requirements := minimalTestService(t)
|
||||
err = requirements.db.SaveBlock(ctx, roBlock)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Before Fulu
|
||||
// -----------
|
||||
actualEas, actualCgc, err := service.updateCustodyInfoInDB(15)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, earliestStoredSlot, actualEas)
|
||||
require.Equal(t, custodyRequirement, actualCgc)
|
||||
|
||||
actualEas, actualCgc, err = service.updateCustodyInfoInDB(17)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, earliestStoredSlot, actualEas)
|
||||
require.Equal(t, custodyRequirement, actualCgc)
|
||||
|
||||
resetFlags := flags.Get()
|
||||
gFlags := new(flags.GlobalFlags)
|
||||
gFlags.Supernode = true
|
||||
flags.Init(gFlags)
|
||||
defer flags.Init(resetFlags)
|
||||
|
||||
actualEas, actualCgc, err = service.updateCustodyInfoInDB(19)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, earliestStoredSlot, actualEas)
|
||||
require.Equal(t, numberOfCustodyGroups, actualCgc)
|
||||
|
||||
// After Fulu
|
||||
// ----------
|
||||
actualEas, actualCgc, err = service.updateCustodyInfoInDB(fuluForkEpoch*primitives.Slot(cfg.SlotsPerEpoch) + 1)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, earliestStoredSlot, actualEas)
|
||||
require.Equal(t, numberOfCustodyGroups, actualCgc)
|
||||
})
|
||||
|
||||
t.Run("CGC increases after fulu", func(t *testing.T) {
|
||||
service, requirements := minimalTestService(t)
|
||||
err = requirements.db.SaveBlock(ctx, roBlock)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Before Fulu
|
||||
// -----------
|
||||
actualEas, actualCgc, err := service.updateCustodyInfoInDB(15)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, earliestStoredSlot, actualEas)
|
||||
require.Equal(t, custodyRequirement, actualCgc)
|
||||
|
||||
actualEas, actualCgc, err = service.updateCustodyInfoInDB(17)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, earliestStoredSlot, actualEas)
|
||||
require.Equal(t, custodyRequirement, actualCgc)
|
||||
|
||||
// After Fulu
|
||||
// ----------
|
||||
resetFlags := flags.Get()
|
||||
gFlags := new(flags.GlobalFlags)
|
||||
gFlags.Supernode = true
|
||||
flags.Init(gFlags)
|
||||
defer flags.Init(resetFlags)
|
||||
|
||||
slot := fuluForkEpoch*primitives.Slot(cfg.SlotsPerEpoch) + 1
|
||||
actualEas, actualCgc, err = service.updateCustodyInfoInDB(slot)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, slot, actualEas)
|
||||
require.Equal(t, numberOfCustodyGroups, actualCgc)
|
||||
|
||||
actualEas, actualCgc, err = service.updateCustodyInfoInDB(slot + 2)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, slot, actualEas)
|
||||
require.Equal(t, numberOfCustodyGroups, actualCgc)
|
||||
})
|
||||
|
||||
t.Run("Supernode downgrade prevented", func(t *testing.T) {
|
||||
service, requirements := minimalTestService(t)
|
||||
err = requirements.db.SaveBlock(ctx, roBlock)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Enable supernode
|
||||
resetFlags := flags.Get()
|
||||
gFlags := new(flags.GlobalFlags)
|
||||
gFlags.Supernode = true
|
||||
flags.Init(gFlags)
|
||||
|
||||
slot := fuluForkEpoch*primitives.Slot(cfg.SlotsPerEpoch) + 1
|
||||
actualEas, actualCgc, err := service.updateCustodyInfoInDB(slot)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, slot, actualEas)
|
||||
require.Equal(t, numberOfCustodyGroups, actualCgc)
|
||||
|
||||
// Try to downgrade by removing flag
|
||||
gFlags.Supernode = false
|
||||
flags.Init(gFlags)
|
||||
defer flags.Init(resetFlags)
|
||||
|
||||
// Should still be supernode
|
||||
actualEas, actualCgc, err = service.updateCustodyInfoInDB(slot + 2)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, slot, actualEas)
|
||||
require.Equal(t, numberOfCustodyGroups, actualCgc) // Still 64, not downgraded
|
||||
})
|
||||
|
||||
t.Run("Semi-supernode downgrade prevented", func(t *testing.T) {
|
||||
service, requirements := minimalTestService(t)
|
||||
err = requirements.db.SaveBlock(ctx, roBlock)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Enable semi-supernode
|
||||
resetFlags := flags.Get()
|
||||
gFlags := new(flags.GlobalFlags)
|
||||
gFlags.SemiSupernode = true
|
||||
flags.Init(gFlags)
|
||||
|
||||
slot := fuluForkEpoch*primitives.Slot(cfg.SlotsPerEpoch) + 1
|
||||
actualEas, actualCgc, err := service.updateCustodyInfoInDB(slot)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, slot, actualEas)
|
||||
semiSupernodeCustody := numberOfCustodyGroups / 2 // 64
|
||||
require.Equal(t, semiSupernodeCustody, actualCgc) // Semi-supernode custodies 64 groups
|
||||
|
||||
// Try to downgrade by removing flag
|
||||
gFlags.SemiSupernode = false
|
||||
flags.Init(gFlags)
|
||||
defer flags.Init(resetFlags)
|
||||
|
||||
// UpdateCustodyInfo should prevent downgrade - custody count should remain at 64
|
||||
actualEas, actualCgc, err = service.updateCustodyInfoInDB(slot + 2)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, slot, actualEas)
|
||||
require.Equal(t, semiSupernodeCustody, actualCgc) // Still 64 due to downgrade prevention by UpdateCustodyInfo
|
||||
})
|
||||
|
||||
t.Run("Semi-supernode to supernode upgrade allowed", func(t *testing.T) {
|
||||
service, requirements := minimalTestService(t)
|
||||
err = requirements.db.SaveBlock(ctx, roBlock)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Start with semi-supernode
|
||||
resetFlags := flags.Get()
|
||||
gFlags := new(flags.GlobalFlags)
|
||||
gFlags.SemiSupernode = true
|
||||
flags.Init(gFlags)
|
||||
|
||||
slot := fuluForkEpoch*primitives.Slot(cfg.SlotsPerEpoch) + 1
|
||||
actualEas, actualCgc, err := service.updateCustodyInfoInDB(slot)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, slot, actualEas)
|
||||
semiSupernodeCustody := numberOfCustodyGroups / 2 // 64
|
||||
require.Equal(t, semiSupernodeCustody, actualCgc) // Semi-supernode custodies 64 groups
|
||||
|
||||
// Upgrade to full supernode
|
||||
gFlags.SemiSupernode = false
|
||||
gFlags.Supernode = true
|
||||
flags.Init(gFlags)
|
||||
defer flags.Init(resetFlags)
|
||||
|
||||
// Should upgrade to full supernode
|
||||
upgradeSlot := slot + 2
|
||||
actualEas, actualCgc, err = service.updateCustodyInfoInDB(upgradeSlot)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, upgradeSlot, actualEas) // Earliest slot updates when upgrading
|
||||
require.Equal(t, numberOfCustodyGroups, actualCgc) // Upgraded to 128
|
||||
})
|
||||
|
||||
t.Run("Semi-supernode with high validator requirements uses higher custody", func(t *testing.T) {
|
||||
service, requirements := minimalTestService(t)
|
||||
err = requirements.db.SaveBlock(ctx, roBlock)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Enable semi-supernode
|
||||
resetFlags := flags.Get()
|
||||
gFlags := new(flags.GlobalFlags)
|
||||
gFlags.SemiSupernode = true
|
||||
flags.Init(gFlags)
|
||||
defer flags.Init(resetFlags)
|
||||
|
||||
// Mock a high custody requirement (simulating many validators)
|
||||
// We need to override the custody requirement calculation
|
||||
// For this test, we'll verify the logic by checking if custodyRequirement > 64
|
||||
// Since custodyRequirement in minimalTestService is 4, we can't test the high case here
|
||||
// This would require a different test setup with actual validators
|
||||
slot := fuluForkEpoch*primitives.Slot(cfg.SlotsPerEpoch) + 1
|
||||
actualEas, actualCgc, err := service.updateCustodyInfoInDB(slot)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, slot, actualEas)
|
||||
semiSupernodeCustody := numberOfCustodyGroups / 2 // 64
|
||||
// With low validator requirements (4), should use semi-supernode minimum (64)
|
||||
require.Equal(t, semiSupernodeCustody, actualCgc)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -10,20 +10,72 @@ import (
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p"
|
||||
"github.com/OffchainLabs/prysm/v7/cmd/beacon-chain/flags"
|
||||
"github.com/OffchainLabs/prysm/v7/config/params"
|
||||
"github.com/OffchainLabs/prysm/v7/time/slots"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
var nilFinalizedStateError = errors.New("finalized state is nil")
|
||||
|
||||
func (s *Service) maintainCustodyInfo() {
|
||||
func (s *Service) maintainCustodyInfo() error {
|
||||
// Rationale of slot choice:
|
||||
// - If syncing with an empty DB from genesis, then justifiedSlot = finalizedSlot = 0,
|
||||
// and the node starts to sync from slot 0 ==> Using justifiedSlot is correct.
|
||||
// - If syncing with an empty DB from a checkpoint, then justifiedSlot = finalizedSlot = checkpointSlot,
|
||||
// and the node starts to sync from checkpointSlot ==> Using justifiedSlot is correct.
|
||||
// - If syncing with a non-empty DB, then justifiedSlot > finalizedSlot,
|
||||
// and the node starts to sync from justifiedSlot + 1 ==> Using justifiedSlot + 1 is correct.
|
||||
const interval = 1 * time.Minute
|
||||
|
||||
finalizedCheckpoint, err := s.cfg.beaconDB.FinalizedCheckpoint(s.ctx)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "finalized checkpoint")
|
||||
}
|
||||
|
||||
if finalizedCheckpoint == nil {
|
||||
return errors.New("finalized checkpoint is nil")
|
||||
}
|
||||
|
||||
finalizedSlot, err := slots.EpochStart(finalizedCheckpoint.Epoch)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "epoch start for finalized slot")
|
||||
}
|
||||
|
||||
justifiedCheckpoint, err := s.cfg.beaconDB.JustifiedCheckpoint(s.ctx)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "justified checkpoint")
|
||||
}
|
||||
|
||||
if justifiedCheckpoint == nil {
|
||||
return errors.New("justified checkpoint is nil")
|
||||
}
|
||||
|
||||
justifiedSlot, err := slots.EpochStart(justifiedCheckpoint.Epoch)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "epoch start for justified slot")
|
||||
}
|
||||
|
||||
slot := justifiedSlot
|
||||
if justifiedSlot > finalizedSlot {
|
||||
slot++
|
||||
}
|
||||
|
||||
earliestAvailableSlot, custodySubnetCount, err := s.updateCustodyInfoInDB(slot)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not get and save custody group count")
|
||||
}
|
||||
|
||||
if _, _, err := s.cfg.p2p.UpdateCustodyInfo(earliestAvailableSlot, custodySubnetCount); err != nil {
|
||||
return errors.Wrap(err, "update custody info")
|
||||
}
|
||||
|
||||
async.RunEvery(s.ctx, interval, func() {
|
||||
if err := s.updateCustodyInfoIfNeeded(); err != nil {
|
||||
log.WithError(err).Error("Failed to update custody info")
|
||||
}
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) updateCustodyInfoIfNeeded() error {
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
mock "github.com/OffchainLabs/prysm/v7/beacon-chain/blockchain/testing"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/transition"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/db/kv"
|
||||
dbTest "github.com/OffchainLabs/prysm/v7/beacon-chain/db/testing"
|
||||
testingDB "github.com/OffchainLabs/prysm/v7/beacon-chain/db/testing"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/peers"
|
||||
@@ -934,6 +935,7 @@ func TestStatusRPCRequest_BadPeerHandshake(t *testing.T) {
|
||||
r := &Service{
|
||||
cfg: &config{
|
||||
p2p: p1,
|
||||
beaconDB: dbTest.SetupDB(t),
|
||||
chain: chain,
|
||||
stateNotifier: chain.StateNotifier(),
|
||||
initialSync: &mockSync.Sync{IsSyncing: false},
|
||||
|
||||
@@ -17,6 +17,7 @@ import (
|
||||
blockfeed "github.com/OffchainLabs/prysm/v7/beacon-chain/core/feed/block"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/feed/operation"
|
||||
statefeed "github.com/OffchainLabs/prysm/v7/beacon-chain/core/feed/state"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/peerdas"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/db"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/db/filesystem"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/execution"
|
||||
@@ -33,9 +34,11 @@ import (
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/sync/backfill/coverage"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/verification"
|
||||
lruwrpr "github.com/OffchainLabs/prysm/v7/cache/lru"
|
||||
"github.com/OffchainLabs/prysm/v7/cmd/beacon-chain/flags"
|
||||
"github.com/OffchainLabs/prysm/v7/config/params"
|
||||
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
|
||||
"github.com/OffchainLabs/prysm/v7/consensus-types/interfaces"
|
||||
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
|
||||
leakybucket "github.com/OffchainLabs/prysm/v7/container/leaky-bucket"
|
||||
"github.com/OffchainLabs/prysm/v7/crypto/rand"
|
||||
"github.com/OffchainLabs/prysm/v7/runtime"
|
||||
@@ -275,11 +278,6 @@ func (s *Service) Start() {
|
||||
|
||||
s.processPendingBlocksQueue()
|
||||
s.maintainPeerStatuses()
|
||||
|
||||
if params.FuluEnabled() {
|
||||
s.maintainCustodyInfo()
|
||||
}
|
||||
|
||||
s.resyncIfBehind()
|
||||
|
||||
// Update sync metrics.
|
||||
@@ -287,6 +285,15 @@ func (s *Service) Start() {
|
||||
|
||||
// Prune data column cache periodically on finalization.
|
||||
async.RunEvery(s.ctx, 30*time.Second, s.pruneDataColumnCache)
|
||||
|
||||
if !params.FuluEnabled() {
|
||||
return
|
||||
}
|
||||
|
||||
if err := s.maintainCustodyInfo(); err != nil {
|
||||
log.WithError(err).Error("Failed to maintain custody info")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Stop the regular sync service.
|
||||
@@ -452,6 +459,89 @@ func (s *Service) waitForInitialSync(ctx context.Context) error {
|
||||
}
|
||||
}
|
||||
|
||||
// UpdateCustodyInfoInDB updates the custody information in the database.
|
||||
// It returns the (potentially updated) custody group count and the earliest available slot.
|
||||
func (s *Service) updateCustodyInfoInDB(slot primitives.Slot) (primitives.Slot, uint64, error) {
|
||||
isSupernode := flags.Get().Supernode
|
||||
isSemiSupernode := flags.Get().SemiSupernode
|
||||
|
||||
cfg := params.BeaconConfig()
|
||||
custodyRequirement := cfg.CustodyRequirement
|
||||
|
||||
// Check if the node was previously subscribed to all data subnets, and if so,
|
||||
// store the new status accordingly.
|
||||
wasSupernode, err := s.cfg.beaconDB.UpdateSubscribedToAllDataSubnets(s.ctx, isSupernode)
|
||||
if err != nil {
|
||||
return 0, 0, errors.Wrap(err, "update subscribed to all data subnets")
|
||||
}
|
||||
|
||||
// Compute the target custody group count based on current flag configuration.
|
||||
targetCustodyGroupCount := custodyRequirement
|
||||
|
||||
// Supernode: custody all groups (either currently set or previously enabled)
|
||||
if isSupernode {
|
||||
targetCustodyGroupCount = cfg.NumberOfCustodyGroups
|
||||
}
|
||||
|
||||
// Semi-supernode: custody minimum needed for reconstruction, or custody requirement if higher
|
||||
if isSemiSupernode {
|
||||
semiSupernodeCustody, err := peerdas.MinimumCustodyGroupCountToReconstruct()
|
||||
if err != nil {
|
||||
return 0, 0, errors.Wrap(err, "minimum custody group count")
|
||||
}
|
||||
|
||||
targetCustodyGroupCount = max(custodyRequirement, semiSupernodeCustody)
|
||||
}
|
||||
|
||||
// Safely compute the fulu fork slot.
|
||||
fuluForkSlot, err := fuluForkSlot()
|
||||
if err != nil {
|
||||
return 0, 0, errors.Wrap(err, "fulu fork slot")
|
||||
}
|
||||
|
||||
// If slot is before the fulu fork slot, then use the earliest stored slot as the reference slot.
|
||||
if slot < fuluForkSlot {
|
||||
slot, err = s.cfg.beaconDB.EarliestSlot(s.ctx)
|
||||
if err != nil {
|
||||
return 0, 0, errors.Wrap(err, "earliest slot")
|
||||
}
|
||||
}
|
||||
|
||||
earliestAvailableSlot, actualCustodyGroupCount, err := s.cfg.beaconDB.UpdateCustodyInfo(s.ctx, slot, targetCustodyGroupCount)
|
||||
if err != nil {
|
||||
return 0, 0, errors.Wrap(err, "update custody info")
|
||||
}
|
||||
|
||||
if isSupernode {
|
||||
log.WithFields(logrus.Fields{
|
||||
"current": actualCustodyGroupCount,
|
||||
"target": cfg.NumberOfCustodyGroups,
|
||||
}).Info("Supernode mode enabled. Will custody all data columns going forward.")
|
||||
}
|
||||
|
||||
if wasSupernode && !isSupernode {
|
||||
log.Warningf("Because the `--%s` flag was previously used, the node will continue to act as a super node.", flags.Supernode.Name)
|
||||
}
|
||||
|
||||
return earliestAvailableSlot, actualCustodyGroupCount, nil
|
||||
}
|
||||
|
||||
func fuluForkSlot() (primitives.Slot, error) {
|
||||
cfg := params.BeaconConfig()
|
||||
|
||||
fuluForkEpoch := cfg.FuluForkEpoch
|
||||
if fuluForkEpoch == cfg.FarFutureEpoch {
|
||||
return cfg.FarFutureSlot, nil
|
||||
}
|
||||
|
||||
forkFuluSlot, err := slots.EpochStart(fuluForkEpoch)
|
||||
if err != nil {
|
||||
return 0, errors.Wrap(err, "epoch start")
|
||||
}
|
||||
|
||||
return forkFuluSlot, nil
|
||||
}
|
||||
|
||||
// Checker defines a struct which can verify whether a node is currently
|
||||
// synchronizing a chain with the rest of peers in the network.
|
||||
type Checker interface {
|
||||
|
||||
@@ -16,6 +16,9 @@ import (
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/startup"
|
||||
state_native "github.com/OffchainLabs/prysm/v7/beacon-chain/state/state-native"
|
||||
mockSync "github.com/OffchainLabs/prysm/v7/beacon-chain/sync/initial-sync/testing"
|
||||
"github.com/OffchainLabs/prysm/v7/cmd/beacon-chain/flags"
|
||||
"github.com/OffchainLabs/prysm/v7/config/params"
|
||||
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
|
||||
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
|
||||
leakybucket "github.com/OffchainLabs/prysm/v7/container/leaky-bucket"
|
||||
"github.com/OffchainLabs/prysm/v7/crypto/bls"
|
||||
@@ -440,3 +443,224 @@ func TestService_Stop_ConcurrentGoodbyeMessages(t *testing.T) {
|
||||
|
||||
require.Equal(t, false, util.WaitTimeout(&wg, 2*time.Second))
|
||||
}
|
||||
|
||||
func TestUpdateCustodyInfoInDB(t *testing.T) {
|
||||
const (
|
||||
fuluForkEpoch = 10
|
||||
custodyRequirement = uint64(4)
|
||||
earliestStoredSlot = primitives.Slot(12)
|
||||
numberOfCustodyGroups = uint64(64)
|
||||
)
|
||||
|
||||
params.SetupTestConfigCleanup(t)
|
||||
cfg := params.BeaconConfig()
|
||||
cfg.FuluForkEpoch = fuluForkEpoch
|
||||
cfg.CustodyRequirement = custodyRequirement
|
||||
cfg.NumberOfCustodyGroups = numberOfCustodyGroups
|
||||
params.OverrideBeaconConfig(cfg)
|
||||
|
||||
ctx := t.Context()
|
||||
pbBlock := util.NewBeaconBlock()
|
||||
pbBlock.Block.Slot = 12
|
||||
signedBeaconBlock, err := blocks.NewSignedBeaconBlock(pbBlock)
|
||||
require.NoError(t, err)
|
||||
|
||||
roBlock, err := blocks.NewROBlock(signedBeaconBlock)
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Run("CGC increases before fulu", func(t *testing.T) {
|
||||
beaconDB := dbTest.SetupDB(t)
|
||||
service := Service{cfg: &config{beaconDB: beaconDB}}
|
||||
err = beaconDB.SaveBlock(ctx, roBlock)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Before Fulu
|
||||
// -----------
|
||||
actualEas, actualCgc, err := service.updateCustodyInfoInDB(15)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, earliestStoredSlot, actualEas)
|
||||
require.Equal(t, custodyRequirement, actualCgc)
|
||||
|
||||
actualEas, actualCgc, err = service.updateCustodyInfoInDB(17)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, earliestStoredSlot, actualEas)
|
||||
require.Equal(t, custodyRequirement, actualCgc)
|
||||
|
||||
resetFlags := flags.Get()
|
||||
gFlags := new(flags.GlobalFlags)
|
||||
gFlags.Supernode = true
|
||||
flags.Init(gFlags)
|
||||
defer flags.Init(resetFlags)
|
||||
|
||||
actualEas, actualCgc, err = service.updateCustodyInfoInDB(19)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, earliestStoredSlot, actualEas)
|
||||
require.Equal(t, numberOfCustodyGroups, actualCgc)
|
||||
|
||||
// After Fulu
|
||||
// ----------
|
||||
actualEas, actualCgc, err = service.updateCustodyInfoInDB(fuluForkEpoch*primitives.Slot(cfg.SlotsPerEpoch) + 1)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, earliestStoredSlot, actualEas)
|
||||
require.Equal(t, numberOfCustodyGroups, actualCgc)
|
||||
})
|
||||
|
||||
t.Run("CGC increases after fulu", func(t *testing.T) {
|
||||
beaconDB := dbTest.SetupDB(t)
|
||||
service := Service{cfg: &config{beaconDB: beaconDB}}
|
||||
err = beaconDB.SaveBlock(ctx, roBlock)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Before Fulu
|
||||
// -----------
|
||||
actualEas, actualCgc, err := service.updateCustodyInfoInDB(15)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, earliestStoredSlot, actualEas)
|
||||
require.Equal(t, custodyRequirement, actualCgc)
|
||||
|
||||
actualEas, actualCgc, err = service.updateCustodyInfoInDB(17)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, earliestStoredSlot, actualEas)
|
||||
require.Equal(t, custodyRequirement, actualCgc)
|
||||
|
||||
// After Fulu
|
||||
// ----------
|
||||
resetFlags := flags.Get()
|
||||
gFlags := new(flags.GlobalFlags)
|
||||
gFlags.Supernode = true
|
||||
flags.Init(gFlags)
|
||||
defer flags.Init(resetFlags)
|
||||
|
||||
slot := fuluForkEpoch*primitives.Slot(cfg.SlotsPerEpoch) + 1
|
||||
actualEas, actualCgc, err = service.updateCustodyInfoInDB(slot)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, slot, actualEas)
|
||||
require.Equal(t, numberOfCustodyGroups, actualCgc)
|
||||
|
||||
actualEas, actualCgc, err = service.updateCustodyInfoInDB(slot + 2)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, slot, actualEas)
|
||||
require.Equal(t, numberOfCustodyGroups, actualCgc)
|
||||
})
|
||||
|
||||
t.Run("Supernode downgrade prevented", func(t *testing.T) {
|
||||
beaconDB := dbTest.SetupDB(t)
|
||||
service := Service{cfg: &config{beaconDB: beaconDB}}
|
||||
err = beaconDB.SaveBlock(ctx, roBlock)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Enable supernode
|
||||
resetFlags := flags.Get()
|
||||
gFlags := new(flags.GlobalFlags)
|
||||
gFlags.Supernode = true
|
||||
flags.Init(gFlags)
|
||||
|
||||
slot := fuluForkEpoch*primitives.Slot(cfg.SlotsPerEpoch) + 1
|
||||
actualEas, actualCgc, err := service.updateCustodyInfoInDB(slot)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, slot, actualEas)
|
||||
require.Equal(t, numberOfCustodyGroups, actualCgc)
|
||||
|
||||
// Try to downgrade by removing flag
|
||||
gFlags.Supernode = false
|
||||
flags.Init(gFlags)
|
||||
defer flags.Init(resetFlags)
|
||||
|
||||
// Should still be supernode
|
||||
actualEas, actualCgc, err = service.updateCustodyInfoInDB(slot + 2)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, slot, actualEas)
|
||||
require.Equal(t, numberOfCustodyGroups, actualCgc) // Still 64, not downgraded
|
||||
})
|
||||
|
||||
t.Run("Semi-supernode downgrade prevented", func(t *testing.T) {
|
||||
beaconDB := dbTest.SetupDB(t)
|
||||
service := Service{cfg: &config{beaconDB: beaconDB}}
|
||||
err = beaconDB.SaveBlock(ctx, roBlock)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Enable semi-supernode
|
||||
resetFlags := flags.Get()
|
||||
gFlags := new(flags.GlobalFlags)
|
||||
gFlags.SemiSupernode = true
|
||||
flags.Init(gFlags)
|
||||
|
||||
slot := fuluForkEpoch*primitives.Slot(cfg.SlotsPerEpoch) + 1
|
||||
actualEas, actualCgc, err := service.updateCustodyInfoInDB(slot)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, slot, actualEas)
|
||||
semiSupernodeCustody := numberOfCustodyGroups / 2 // 64
|
||||
require.Equal(t, semiSupernodeCustody, actualCgc) // Semi-supernode custodies 64 groups
|
||||
|
||||
// Try to downgrade by removing flag
|
||||
gFlags.SemiSupernode = false
|
||||
flags.Init(gFlags)
|
||||
defer flags.Init(resetFlags)
|
||||
|
||||
// UpdateCustodyInfo should prevent downgrade - custody count should remain at 64
|
||||
actualEas, actualCgc, err = service.updateCustodyInfoInDB(slot + 2)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, slot, actualEas)
|
||||
require.Equal(t, semiSupernodeCustody, actualCgc) // Still 64 due to downgrade prevention by UpdateCustodyInfo
|
||||
})
|
||||
|
||||
t.Run("Semi-supernode to supernode upgrade allowed", func(t *testing.T) {
|
||||
beaconDB := dbTest.SetupDB(t)
|
||||
service := Service{cfg: &config{beaconDB: beaconDB}}
|
||||
err = beaconDB.SaveBlock(ctx, roBlock)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Start with semi-supernode
|
||||
resetFlags := flags.Get()
|
||||
gFlags := new(flags.GlobalFlags)
|
||||
gFlags.SemiSupernode = true
|
||||
flags.Init(gFlags)
|
||||
|
||||
slot := fuluForkEpoch*primitives.Slot(cfg.SlotsPerEpoch) + 1
|
||||
actualEas, actualCgc, err := service.updateCustodyInfoInDB(slot)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, slot, actualEas)
|
||||
semiSupernodeCustody := numberOfCustodyGroups / 2 // 64
|
||||
require.Equal(t, semiSupernodeCustody, actualCgc) // Semi-supernode custodies 64 groups
|
||||
|
||||
// Upgrade to full supernode
|
||||
gFlags.SemiSupernode = false
|
||||
gFlags.Supernode = true
|
||||
flags.Init(gFlags)
|
||||
defer flags.Init(resetFlags)
|
||||
|
||||
// Should upgrade to full supernode
|
||||
upgradeSlot := slot + 2
|
||||
actualEas, actualCgc, err = service.updateCustodyInfoInDB(upgradeSlot)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, upgradeSlot, actualEas) // Earliest slot updates when upgrading
|
||||
require.Equal(t, numberOfCustodyGroups, actualCgc) // Upgraded to 128
|
||||
})
|
||||
|
||||
t.Run("Semi-supernode with high validator requirements uses higher custody", func(t *testing.T) {
|
||||
beaconDB := dbTest.SetupDB(t)
|
||||
service := Service{cfg: &config{beaconDB: beaconDB}}
|
||||
err = beaconDB.SaveBlock(ctx, roBlock)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Enable semi-supernode
|
||||
resetFlags := flags.Get()
|
||||
gFlags := new(flags.GlobalFlags)
|
||||
gFlags.SemiSupernode = true
|
||||
flags.Init(gFlags)
|
||||
defer flags.Init(resetFlags)
|
||||
|
||||
// Mock a high custody requirement (simulating many validators)
|
||||
// We need to override the custody requirement calculation
|
||||
// For this test, we'll verify the logic by checking if custodyRequirement > 64
|
||||
// Since custodyRequirement in minimalTestService is 4, we can't test the high case here
|
||||
// This would require a different test setup with actual validators
|
||||
slot := fuluForkEpoch*primitives.Slot(cfg.SlotsPerEpoch) + 1
|
||||
actualEas, actualCgc, err := service.updateCustodyInfoInDB(slot)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, slot, actualEas)
|
||||
semiSupernodeCustody := numberOfCustodyGroups / 2 // 64
|
||||
// With low validator requirements (4), should use semi-supernode minimum (64)
|
||||
require.Equal(t, semiSupernodeCustody, actualCgc)
|
||||
})
|
||||
}
|
||||
|
||||
3
changelog/james-prysm_cache-attestation-data.md
Normal file
3
changelog/james-prysm_cache-attestation-data.md
Normal file
@@ -0,0 +1,3 @@
|
||||
### Changed
|
||||
|
||||
- post electra we now call attestation data once per slot and use a cache for subsequent requests
|
||||
3
changelog/manu-eas.md
Normal file
3
changelog/manu-eas.md
Normal file
@@ -0,0 +1,3 @@
|
||||
### Fixed
|
||||
|
||||
- When adding the `--[semi-]supernode` flag, update the ealiest available slot accordingly.
|
||||
2
changelog/potuz_use_hashtree.md
Normal file
2
changelog/potuz_use_hashtree.md
Normal file
@@ -0,0 +1,2 @@
|
||||
### Added
|
||||
- Add feature flag to use hashtree instead of gohashtre.
|
||||
@@ -70,6 +70,7 @@ type Flags struct {
|
||||
DisableStakinContractCheck bool // Disables check for deposit contract when proposing blocks
|
||||
IgnoreUnviableAttestations bool // Ignore attestations whose target state is not viable (avoids lagging-node DoS).
|
||||
|
||||
EnableHashtree bool // Enables usage of the hashtree library for hashing
|
||||
EnableVerboseSigVerification bool // EnableVerboseSigVerification specifies whether to verify individual signature if batch verification fails
|
||||
EnableProposerPreprocessing bool // EnableProposerPreprocessing enables proposer pre-processing of blocks before proposing.
|
||||
|
||||
@@ -237,6 +238,10 @@ func ConfigureBeaconChain(ctx *cli.Context) error {
|
||||
logEnabled(enableFullSSZDataLogging)
|
||||
cfg.EnableFullSSZDataLogging = true
|
||||
}
|
||||
if ctx.IsSet(enableHashtree.Name) {
|
||||
logEnabled(enableHashtree)
|
||||
cfg.EnableHashtree = true
|
||||
}
|
||||
cfg.EnableVerboseSigVerification = true
|
||||
if ctx.IsSet(disableVerboseSigVerification.Name) {
|
||||
logEnabled(disableVerboseSigVerification)
|
||||
|
||||
@@ -133,6 +133,10 @@ var (
|
||||
Name: "enable-beacon-rest-api",
|
||||
Usage: "(Experimental): Enables of the beacon REST API when querying a beacon node.",
|
||||
}
|
||||
enableHashtree = &cli.BoolFlag{
|
||||
Name: "enable-hashtree",
|
||||
Usage: "(Experimental): Enables the hashtree hashing library.",
|
||||
}
|
||||
disableVerboseSigVerification = &cli.BoolFlag{
|
||||
Name: "disable-verbose-sig-verification",
|
||||
Usage: "Disables identifying invalid signatures if batch verification fails when processing block.",
|
||||
@@ -278,6 +282,7 @@ var BeaconChainFlags = combinedFlags([]cli.Flag{
|
||||
enableExperimentalAttestationPool,
|
||||
forceHeadFlag,
|
||||
blacklistRoots,
|
||||
enableHashtree,
|
||||
}, deprecatedBeaconFlags, deprecatedFlags, upcomingDeprecation)
|
||||
|
||||
func combinedFlags(flags ...[]cli.Flag) []cli.Flag {
|
||||
|
||||
@@ -36,7 +36,6 @@ go_library(
|
||||
"//runtime/version:go_default_library",
|
||||
"@com_github_pkg_errors//:go_default_library",
|
||||
"@com_github_prysmaticlabs_fastssz//:go_default_library",
|
||||
"@com_github_prysmaticlabs_gohashtree//:go_default_library",
|
||||
"@org_golang_google_protobuf//proto:go_default_library",
|
||||
],
|
||||
)
|
||||
@@ -63,6 +62,7 @@ go_test(
|
||||
"//consensus-types/interfaces:go_default_library",
|
||||
"//consensus-types/primitives:go_default_library",
|
||||
"//container/trie:go_default_library",
|
||||
"//crypto/hash/htr:go_default_library",
|
||||
"//encoding/bytesutil:go_default_library",
|
||||
"//proto/engine/v1:go_default_library",
|
||||
"//proto/prysm/v1alpha1:go_default_library",
|
||||
@@ -72,6 +72,5 @@ go_test(
|
||||
"//testing/require:go_default_library",
|
||||
"@com_github_prysmaticlabs_fastssz//:go_default_library",
|
||||
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
|
||||
"@com_github_prysmaticlabs_gohashtree//:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
@@ -5,10 +5,10 @@ import (
|
||||
"github.com/OffchainLabs/prysm/v7/config/params"
|
||||
"github.com/OffchainLabs/prysm/v7/consensus-types/interfaces"
|
||||
"github.com/OffchainLabs/prysm/v7/container/trie"
|
||||
"github.com/OffchainLabs/prysm/v7/crypto/hash/htr"
|
||||
"github.com/OffchainLabs/prysm/v7/encoding/ssz"
|
||||
"github.com/OffchainLabs/prysm/v7/runtime/version"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/gohashtree"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -45,7 +45,7 @@ func VerifyKZGInclusionProof(blob ROBlob) error {
|
||||
return errInvalidBodyRoot
|
||||
}
|
||||
chunks := makeChunk(blob.KzgCommitment)
|
||||
gohashtree.HashChunks(chunks, chunks)
|
||||
htr.HashChunks(chunks, chunks)
|
||||
verified := trie.VerifyMerkleProof(root, chunks[0][:], blob.Index+KZGOffset, blob.CommitmentInclusionProof)
|
||||
if !verified {
|
||||
return errInvalidInclusionProof
|
||||
@@ -182,7 +182,7 @@ func LeavesFromCommitments(commitments [][]byte) [][]byte {
|
||||
leaves := make([][]byte, len(commitments))
|
||||
for i, kzg := range commitments {
|
||||
chunk := makeChunk(kzg)
|
||||
gohashtree.HashChunks(chunk, chunk)
|
||||
htr.HashChunks(chunk, chunk)
|
||||
leaves[i] = chunk[0][:]
|
||||
}
|
||||
return leaves
|
||||
|
||||
@@ -8,10 +8,10 @@ import (
|
||||
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
|
||||
"github.com/OffchainLabs/prysm/v7/consensus-types/interfaces"
|
||||
"github.com/OffchainLabs/prysm/v7/container/trie"
|
||||
"github.com/OffchainLabs/prysm/v7/crypto/hash/htr"
|
||||
enginev1 "github.com/OffchainLabs/prysm/v7/proto/engine/v1"
|
||||
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
|
||||
"github.com/OffchainLabs/prysm/v7/testing/require"
|
||||
"github.com/prysmaticlabs/gohashtree"
|
||||
)
|
||||
|
||||
func Test_MerkleProofKZGCommitment_Altair(t *testing.T) {
|
||||
@@ -108,7 +108,7 @@ func Test_MerkleProofKZGCommitment(t *testing.T) {
|
||||
require.Equal(t, true, trie.VerifyMerkleProof(root[:], commitmentsRoot[:], kzgPosition, topProof[:len(topProof)-1]))
|
||||
|
||||
chunk := makeChunk(kzgs[index])
|
||||
gohashtree.HashChunks(chunk, chunk)
|
||||
htr.HashChunks(chunk, chunk)
|
||||
require.Equal(t, true, trie.VerifyMerkleProof(root[:], chunk[0][:], uint64(index+KZGOffset), proof))
|
||||
}
|
||||
|
||||
|
||||
@@ -5,7 +5,12 @@ go_library(
|
||||
srcs = ["hashtree.go"],
|
||||
importpath = "github.com/OffchainLabs/prysm/v7/crypto/hash/htr",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = ["@com_github_prysmaticlabs_gohashtree//:go_default_library"],
|
||||
deps = [
|
||||
"//config/features:go_default_library",
|
||||
"@com_github_offchainlabs_hashtree//:go_default_library",
|
||||
"@com_github_prysmaticlabs_gohashtree//:go_default_library",
|
||||
"@com_github_sirupsen_logrus//:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
go_test(
|
||||
@@ -13,5 +18,8 @@ go_test(
|
||||
size = "small",
|
||||
srcs = ["hashtree_test.go"],
|
||||
embed = [":go_default_library"],
|
||||
deps = ["//testing/require:go_default_library"],
|
||||
deps = [
|
||||
"//config/features:go_default_library",
|
||||
"//testing/require:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
@@ -4,14 +4,38 @@ import (
|
||||
"runtime"
|
||||
"sync"
|
||||
|
||||
"github.com/OffchainLabs/hashtree"
|
||||
"github.com/OffchainLabs/prysm/v7/config/features"
|
||||
"github.com/prysmaticlabs/gohashtree"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
const minSliceSizeToParallelize = 5000
|
||||
|
||||
// Hash hashes chunks pairwise into digests using the configured hashing library.
|
||||
// It performs input validation (odd chunks, digest length).
|
||||
func Hash(digests, chunks [][32]byte) error {
|
||||
if features.Get().EnableHashtree {
|
||||
return hashtree.Hash(digests, chunks)
|
||||
}
|
||||
return gohashtree.Hash(digests, chunks)
|
||||
}
|
||||
|
||||
// HashChunks hashes chunks pairwise into digests without error checking.
|
||||
// The caller must ensure inputs are valid (even chunks, sufficient digest space).
|
||||
func HashChunks(digests, chunks [][32]byte) {
|
||||
if features.Get().EnableHashtree {
|
||||
if err := hashtree.Hash(digests, chunks); err != nil {
|
||||
log.WithError(err).Error("Could not hash chunks")
|
||||
}
|
||||
} else {
|
||||
gohashtree.HashChunks(digests, chunks)
|
||||
}
|
||||
}
|
||||
|
||||
func hashParallel(inputList [][32]byte, outputList [][32]byte, wg *sync.WaitGroup) {
|
||||
defer wg.Done()
|
||||
err := gohashtree.Hash(outputList, inputList)
|
||||
err := Hash(outputList, inputList)
|
||||
if err != nil {
|
||||
panic(err) // lint:nopanic -- This should never panic.
|
||||
}
|
||||
@@ -25,7 +49,7 @@ func hashParallel(inputList [][32]byte, outputList [][32]byte, wg *sync.WaitGrou
|
||||
func VectorizedSha256(inputList [][32]byte) [][32]byte {
|
||||
outputList := make([][32]byte, len(inputList)/2)
|
||||
if len(inputList) < minSliceSizeToParallelize {
|
||||
err := gohashtree.Hash(outputList, inputList)
|
||||
err := Hash(outputList, inputList)
|
||||
if err != nil {
|
||||
panic(err) // lint:nopanic -- This should never panic.
|
||||
}
|
||||
@@ -38,7 +62,7 @@ func VectorizedSha256(inputList [][32]byte) [][32]byte {
|
||||
for j := range n {
|
||||
go hashParallel(inputList[j*2*groupSize:(j+1)*2*groupSize], outputList[j*groupSize:], &wg)
|
||||
}
|
||||
err := gohashtree.Hash(outputList[n*groupSize:], inputList[n*2*groupSize:])
|
||||
err := Hash(outputList[n*groupSize:], inputList[n*2*groupSize:])
|
||||
if err != nil {
|
||||
panic(err) // lint:nopanic -- This should never panic.
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v7/config/features"
|
||||
"github.com/OffchainLabs/prysm/v7/testing/require"
|
||||
)
|
||||
|
||||
@@ -23,3 +24,25 @@ func Test_VectorizedSha256(t *testing.T) {
|
||||
require.Equal(t, r, hash2[i])
|
||||
}
|
||||
}
|
||||
|
||||
func Test_VectorizedSha256_hashtree_enabled(t *testing.T) {
|
||||
resetCfg := features.InitWithReset(&features.Flags{
|
||||
EnableHashtree: true,
|
||||
})
|
||||
defer resetCfg()
|
||||
|
||||
largeSlice := make([][32]byte, 32*minSliceSizeToParallelize)
|
||||
secondLargeSlice := make([][32]byte, 32*minSliceSizeToParallelize)
|
||||
hash1 := make([][32]byte, 16*minSliceSizeToParallelize)
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Go(func() {
|
||||
tempHash := VectorizedSha256(largeSlice)
|
||||
copy(hash1, tempHash)
|
||||
})
|
||||
wg.Wait()
|
||||
hash2 := VectorizedSha256(secondLargeSlice)
|
||||
require.Equal(t, len(hash1), len(hash2))
|
||||
for i, r := range hash1 {
|
||||
require.Equal(t, r, hash2[i])
|
||||
}
|
||||
}
|
||||
|
||||
9
deps.bzl
9
deps.bzl
@@ -2449,6 +2449,15 @@ def prysm_deps():
|
||||
sum = "h1:8feyoE3OzPrcshW5/MJ4sGESc5cqmGkGCWlco4l0bqY=",
|
||||
version = "v1.4.11",
|
||||
)
|
||||
go_repository(
|
||||
name = "com_github_offchainlabs_hashtree",
|
||||
build_file_generation = "off",
|
||||
importpath = "github.com/OffchainLabs/hashtree",
|
||||
patch_args = ["-p1"],
|
||||
patches = ["//third_party:com_github_offchainlabs_hashtree.patch"],
|
||||
sum = "h1:R6DAjgAUwwfgji3jEI4WUxtZ3eJ+FbRHjW21UPMBJyo=",
|
||||
version = "v0.2.2",
|
||||
)
|
||||
go_repository(
|
||||
name = "com_github_oklog_oklog",
|
||||
importpath = "github.com/oklog/oklog",
|
||||
|
||||
@@ -21,7 +21,6 @@ go_library(
|
||||
"@com_github_minio_sha256_simd//:go_default_library",
|
||||
"@com_github_pkg_errors//:go_default_library",
|
||||
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
|
||||
"@com_github_prysmaticlabs_gohashtree//:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
@@ -6,7 +6,6 @@ import (
|
||||
"github.com/OffchainLabs/prysm/v7/container/trie"
|
||||
"github.com/OffchainLabs/prysm/v7/crypto/hash/htr"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/gohashtree"
|
||||
)
|
||||
|
||||
var errInvalidNilSlice = errors.New("invalid empty slice")
|
||||
@@ -182,10 +181,10 @@ func MerkleizeListSSZ[T Hashable](elements []T, limit uint64) ([32]byte, error)
|
||||
chunks := make([][32]byte, 2)
|
||||
chunks[0] = body
|
||||
binary.LittleEndian.PutUint64(chunks[1][:], uint64(len(elements)))
|
||||
if err := gohashtree.Hash(chunks, chunks); err != nil {
|
||||
if err = htr.Hash(chunks, chunks); err != nil {
|
||||
return [32]byte{}, err
|
||||
}
|
||||
return chunks[0], err
|
||||
return chunks[0], nil
|
||||
}
|
||||
|
||||
// MerkleizeByteSliceSSZ hashes a byteslice by chunkifying it and returning the
|
||||
|
||||
1
go.mod
1
go.mod
@@ -6,6 +6,7 @@ require (
|
||||
github.com/MariusVanDerWijden/FuzzyVM v0.0.0-20240516070431-7828990cad7d
|
||||
github.com/MariusVanDerWijden/tx-fuzz v1.4.0
|
||||
github.com/OffchainLabs/go-bitfield v0.0.0-20251031151322-f427d04d8506
|
||||
github.com/OffchainLabs/hashtree v0.2.3
|
||||
github.com/aristanetworks/goarista v0.0.0-20200805130819-fd197cf57d96
|
||||
github.com/bazelbuild/rules_go v0.23.2
|
||||
github.com/btcsuite/btcd/btcec/v2 v2.3.4
|
||||
|
||||
2
go.sum
2
go.sum
@@ -59,6 +59,8 @@ github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERo
|
||||
github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU=
|
||||
github.com/OffchainLabs/go-bitfield v0.0.0-20251031151322-f427d04d8506 h1:d/SJkN8/9Ca+1YmuDiUJxAiV4w/a9S8NcsG7GMQSrVI=
|
||||
github.com/OffchainLabs/go-bitfield v0.0.0-20251031151322-f427d04d8506/go.mod h1:6TZI4FU6zT8x6ZfWa1J8YQ2NgW0wLV/W3fHRca8ISBo=
|
||||
github.com/OffchainLabs/hashtree v0.2.3 h1:nM8dBAQZzHLzzM14FaAHXnHTAXZIst69v5xWuS48y/c=
|
||||
github.com/OffchainLabs/hashtree v0.2.3/go.mod h1:b07+cRZs+eAR8TR57CB9TQlt5Gnl/06Xs76xt/1wq0M=
|
||||
github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
|
||||
github.com/Shopify/sarama v1.26.1/go.mod h1:NbSGBSSndYaIhRcBtY9V0U7AyH+x71bG668AuWys/yU=
|
||||
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
|
||||
|
||||
90
third_party/com_github_offchainlabs_hashtree.patch
vendored
Normal file
90
third_party/com_github_offchainlabs_hashtree.patch
vendored
Normal file
@@ -0,0 +1,90 @@
|
||||
diff -urN a/BUILD.bazel b/BUILD.bazel
|
||||
--- a/BUILD.bazel 1969-12-31 18:00:00.000000000 -0600
|
||||
+++ b/BUILD.bazel 2025-01-05 12:00:00.000000000 -0600
|
||||
@@ -0,0 +1,86 @@
|
||||
+load("@io_bazel_rules_go//go:def.bzl", "go_library")
|
||||
+
|
||||
+go_library(
|
||||
+ name = "go_default_library",
|
||||
+ srcs = [
|
||||
+ "bindings.go",
|
||||
+ "sha256_1_generic.go",
|
||||
+ ] + select({
|
||||
+ "@io_bazel_rules_go//go/platform:linux_amd64": [
|
||||
+ "bindings_amd64.go",
|
||||
+ "wrapper_linux_amd64.s",
|
||||
+ ":hashtree_amd64_syso",
|
||||
+ ],
|
||||
+ "@io_bazel_rules_go//go/platform:windows_amd64": [
|
||||
+ "bindings_amd64.go",
|
||||
+ "wrapper_windows_amd64.s",
|
||||
+ ":hashtree_amd64_syso",
|
||||
+ ],
|
||||
+ "@io_bazel_rules_go//go/platform:linux_arm64": [
|
||||
+ "bindings_arm64.go",
|
||||
+ "wrapper_arm64.s",
|
||||
+ ":hashtree_arm64_syso",
|
||||
+ ],
|
||||
+ "@io_bazel_rules_go//go/platform:darwin_arm64": [
|
||||
+ "bindings_arm64.go",
|
||||
+ "wrapper_arm64.s",
|
||||
+ ":hashtree_arm64_syso",
|
||||
+ ],
|
||||
+ "@io_bazel_rules_go//go/platform:darwin_amd64": [
|
||||
+ "bindings_darwin_amd64.go",
|
||||
+ ],
|
||||
+ "//conditions:default": [],
|
||||
+ }),
|
||||
+ importpath = "github.com/OffchainLabs/hashtree",
|
||||
+ visibility = ["//visibility:public"],
|
||||
+ deps = ["@com_github_klauspost_cpuid_v2//:go_default_library"],
|
||||
+)
|
||||
+
|
||||
+genrule(
|
||||
+ name = "hashtree_arm64_syso",
|
||||
+ srcs = [":hashtree_arm64"],
|
||||
+ outs = ["hashtree_arm64.syso"],
|
||||
+ cmd = "cp $(location :hashtree_arm64) $@",
|
||||
+)
|
||||
+
|
||||
+genrule(
|
||||
+ name = "hashtree_amd64_syso",
|
||||
+ srcs = [":hashtree_amd64"],
|
||||
+ outs = ["hashtree_amd64.syso"],
|
||||
+ cmd = "cp $(location :hashtree_amd64) $@",
|
||||
+)
|
||||
+
|
||||
+cc_library(
|
||||
+ name = "hashtree_arm64",
|
||||
+ srcs = [
|
||||
+ "src/hashtree.c",
|
||||
+ "src/sha256_generic.c",
|
||||
+ "src/sha256_armv8_crypto.S",
|
||||
+ "src/sha256_armv8_neon_x1.S",
|
||||
+ "src/sha256_armv8_neon_x4.S",
|
||||
+ ],
|
||||
+ hdrs = ["src/hashtree.h"],
|
||||
+ copts = ["-O3", "-Wall"],
|
||||
+ linkstatic = True,
|
||||
+ strip_include_prefix = "src",
|
||||
+ visibility = ["//visibility:public"],
|
||||
+)
|
||||
+
|
||||
+cc_library(
|
||||
+ name = "hashtree_amd64",
|
||||
+ srcs = [
|
||||
+ "src/hashtree.c",
|
||||
+ "src/sha256_generic.c",
|
||||
+ "src/sha256_avx_x1.S",
|
||||
+ "src/sha256_avx_x4.S",
|
||||
+ "src/sha256_avx_x8.S",
|
||||
+ "src/sha256_avx_x16.S",
|
||||
+ "src/sha256_shani.S",
|
||||
+ "src/sha256_sse_x1.S",
|
||||
+ ],
|
||||
+ hdrs = ["src/hashtree.h"],
|
||||
+ copts = ["-O3", "-Wall", "-fno-integrated-as"],
|
||||
+ linkstatic = True,
|
||||
+ strip_include_prefix = "src",
|
||||
+ visibility = ["//visibility:public"],
|
||||
+)
|
||||
@@ -71,17 +71,9 @@ func (v *validator) SubmitAttestation(ctx context.Context, slot primitives.Slot,
|
||||
return
|
||||
}
|
||||
|
||||
committeeIndex := duty.CommitteeIndex
|
||||
postElectra := slots.ToEpoch(slot) >= params.BeaconConfig().ElectraForkEpoch
|
||||
if postElectra {
|
||||
committeeIndex = 0
|
||||
}
|
||||
|
||||
req := ðpb.AttestationDataRequest{
|
||||
Slot: slot,
|
||||
CommitteeIndex: committeeIndex,
|
||||
}
|
||||
data, err := v.validatorClient.AttestationData(ctx, req)
|
||||
data, err := v.getAttestationData(ctx, slot, duty.CommitteeIndex)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not request attestation to sign at slot")
|
||||
if v.emitAccountMetrics {
|
||||
|
||||
@@ -65,57 +65,60 @@ var (
|
||||
)
|
||||
|
||||
type validator struct {
|
||||
duties *ethpb.ValidatorDutiesContainer
|
||||
ticker slots.Ticker
|
||||
genesisTime time.Time
|
||||
logValidatorPerformance bool
|
||||
distributed bool
|
||||
enableAPI bool
|
||||
disableDutiesPolling bool
|
||||
emitAccountMetrics bool
|
||||
aggregatedSlotCommitteeIDCacheLock sync.Mutex
|
||||
attLogsLock sync.Mutex
|
||||
attSelectionLock sync.Mutex
|
||||
highestValidSlotLock sync.Mutex
|
||||
domainDataLock sync.RWMutex
|
||||
blacklistedPubkeysLock sync.RWMutex
|
||||
prevEpochBalancesLock sync.RWMutex
|
||||
attestationDataCacheLock sync.RWMutex
|
||||
dutiesLock sync.RWMutex
|
||||
attestationDataCache *ethpb.AttestationData
|
||||
attestationDataCacheSlot primitives.Slot
|
||||
accountsChangedChannel chan [][fieldparams.BLSPubkeyLength]byte
|
||||
eventsChannel chan *eventClient.Event
|
||||
highestValidSlot primitives.Slot
|
||||
submittedAggregates map[submittedAttKey]*submittedAtt
|
||||
graffitiStruct *graffiti.Graffiti
|
||||
syncCommitteeStats syncCommitteeStats
|
||||
slotFeed *event.Feed
|
||||
domainDataCache *ristretto.Cache[string, proto.Message]
|
||||
aggregatedSlotCommitteeIDCache *lru.Cache
|
||||
attSelections map[attSelectionKey]iface.BeaconCommitteeSelection
|
||||
interopKeysConfig *local.InteropKeymanagerConfig
|
||||
duties *ethpb.ValidatorDutiesContainer
|
||||
signedValidatorRegistrations map[[fieldparams.BLSPubkeyLength]byte]*ethpb.SignedValidatorRegistrationV1
|
||||
proposerSettings *proposer.Settings
|
||||
web3SignerConfig *remoteweb3signer.SetupConfig
|
||||
startBalances map[[fieldparams.BLSPubkeyLength]byte]uint64
|
||||
prevEpochBalances map[[fieldparams.BLSPubkeyLength]byte]uint64
|
||||
blacklistedPubkeys map[[fieldparams.BLSPubkeyLength]byte]bool
|
||||
pubkeyToStatus map[[fieldparams.BLSPubkeyLength]byte]*validatorStatus
|
||||
wallet *wallet.Wallet
|
||||
walletInitializedChan chan *wallet.Wallet
|
||||
walletInitializedFeed *event.Feed
|
||||
graffiti []byte
|
||||
graffitiStruct *graffiti.Graffiti
|
||||
graffitiOrderedIndex uint64
|
||||
beaconNodeHosts []string
|
||||
currentHostIndex uint64
|
||||
walletInitializedFeed *event.Feed
|
||||
graffitiOrderedIndex uint64
|
||||
submittedAtts map[submittedAttKey]*submittedAtt
|
||||
validatorsRegBatchSize int
|
||||
validatorClient iface.ValidatorClient
|
||||
chainClient iface.ChainClient
|
||||
nodeClient iface.NodeClient
|
||||
prysmChainClient iface.PrysmChainClient
|
||||
db db.Database
|
||||
km keymanager.IKeymanager
|
||||
web3SignerConfig *remoteweb3signer.SetupConfig
|
||||
proposerSettings *proposer.Settings
|
||||
signedValidatorRegistrations map[[fieldparams.BLSPubkeyLength]byte]*ethpb.SignedValidatorRegistrationV1
|
||||
validatorsRegBatchSize int
|
||||
interopKeysConfig *local.InteropKeymanagerConfig
|
||||
attSelections map[attSelectionKey]iface.BeaconCommitteeSelection
|
||||
aggregatedSlotCommitteeIDCache *lru.Cache
|
||||
domainDataCache *ristretto.Cache[string, proto.Message]
|
||||
voteStats voteStats
|
||||
syncCommitteeStats syncCommitteeStats
|
||||
submittedAtts map[submittedAttKey]*submittedAtt
|
||||
submittedAggregates map[submittedAttKey]*submittedAtt
|
||||
logValidatorPerformance bool
|
||||
emitAccountMetrics bool
|
||||
enableAPI bool
|
||||
distributed bool
|
||||
domainDataLock sync.RWMutex
|
||||
attLogsLock sync.Mutex
|
||||
aggregatedSlotCommitteeIDCacheLock sync.Mutex
|
||||
highestValidSlotLock sync.Mutex
|
||||
prevEpochBalancesLock sync.RWMutex
|
||||
blacklistedPubkeysLock sync.RWMutex
|
||||
attSelectionLock sync.Mutex
|
||||
dutiesLock sync.RWMutex
|
||||
disableDutiesPolling bool
|
||||
accountsChangedChannel chan [][fieldparams.BLSPubkeyLength]byte
|
||||
eventsChannel chan *eventClient.Event
|
||||
accountChangedSub event.Subscription
|
||||
ticker slots.Ticker
|
||||
beaconNodeHosts []string
|
||||
genesisTime time.Time
|
||||
graffiti []byte
|
||||
voteStats voteStats
|
||||
}
|
||||
|
||||
type validatorStatus struct {
|
||||
@@ -977,6 +980,55 @@ func (v *validator) domainData(ctx context.Context, epoch primitives.Epoch, doma
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// getAttestationData fetches attestation data from the beacon node with caching for post-Electra.
|
||||
// Post-Electra, attestation data is identical for all validators in the same slot (committee index is always 0),
|
||||
// so we cache it to avoid redundant beacon node requests.
|
||||
func (v *validator) getAttestationData(ctx context.Context, slot primitives.Slot, committeeIndex primitives.CommitteeIndex) (*ethpb.AttestationData, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "validator.getAttestationData")
|
||||
defer span.End()
|
||||
|
||||
postElectra := slots.ToEpoch(slot) >= params.BeaconConfig().ElectraForkEpoch
|
||||
|
||||
// Pre-Electra: no caching since committee index varies per validator
|
||||
if !postElectra {
|
||||
return v.validatorClient.AttestationData(ctx, ðpb.AttestationDataRequest{
|
||||
Slot: slot,
|
||||
CommitteeIndex: committeeIndex,
|
||||
})
|
||||
}
|
||||
|
||||
// Post-Electra: check cache first (committee index is always 0)
|
||||
v.attestationDataCacheLock.RLock()
|
||||
if v.attestationDataCacheSlot == slot && v.attestationDataCache != nil {
|
||||
data := v.attestationDataCache
|
||||
v.attestationDataCacheLock.RUnlock()
|
||||
return data, nil
|
||||
}
|
||||
v.attestationDataCacheLock.RUnlock()
|
||||
|
||||
// Cache miss - acquire write lock and fetch
|
||||
v.attestationDataCacheLock.Lock()
|
||||
defer v.attestationDataCacheLock.Unlock()
|
||||
|
||||
// Double-check after acquiring write lock (another goroutine may have filled the cache)
|
||||
if v.attestationDataCacheSlot == slot && v.attestationDataCache != nil {
|
||||
return v.attestationDataCache, nil
|
||||
}
|
||||
|
||||
data, err := v.validatorClient.AttestationData(ctx, ðpb.AttestationDataRequest{
|
||||
Slot: slot,
|
||||
CommitteeIndex: 0,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
v.attestationDataCache = data
|
||||
v.attestationDataCacheSlot = slot
|
||||
|
||||
return data, nil
|
||||
}
|
||||
|
||||
func (v *validator) logDuties(slot primitives.Slot, currentEpochDuties []*ethpb.ValidatorDuty, nextEpochDuties []*ethpb.ValidatorDuty) {
|
||||
attesterKeys := make([][]string, params.BeaconConfig().SlotsPerEpoch)
|
||||
for i := range attesterKeys {
|
||||
|
||||
@@ -2977,3 +2977,207 @@ func TestValidator_CheckDependentRoots(t *testing.T) {
|
||||
require.NoError(t, v.checkDependentRoots(ctx, head))
|
||||
})
|
||||
}
|
||||
|
||||
func TestGetAttestationData_PreElectraNoCaching(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
|
||||
client := validatormock.NewMockValidatorClient(ctrl)
|
||||
v := &validator{validatorClient: client}
|
||||
|
||||
// Pre-Electra slot (Electra fork epoch is far in the future by default)
|
||||
preElectraSlot := primitives.Slot(10)
|
||||
|
||||
expectedData := ðpb.AttestationData{
|
||||
Slot: preElectraSlot,
|
||||
CommitteeIndex: 5,
|
||||
BeaconBlockRoot: bytesutil.PadTo([]byte("root"), 32),
|
||||
Source: ðpb.Checkpoint{Epoch: 1, Root: bytesutil.PadTo([]byte("source"), 32)},
|
||||
Target: ðpb.Checkpoint{Epoch: 2, Root: bytesutil.PadTo([]byte("target"), 32)},
|
||||
}
|
||||
|
||||
// Each call should go to the beacon node (no caching pre-Electra)
|
||||
client.EXPECT().AttestationData(gomock.Any(), ðpb.AttestationDataRequest{
|
||||
Slot: preElectraSlot,
|
||||
CommitteeIndex: 5,
|
||||
}).Return(expectedData, nil)
|
||||
client.EXPECT().AttestationData(gomock.Any(), ðpb.AttestationDataRequest{
|
||||
Slot: preElectraSlot,
|
||||
CommitteeIndex: 7,
|
||||
}).Return(expectedData, nil)
|
||||
|
||||
// First call with committee index 5
|
||||
data1, err := v.getAttestationData(context.Background(), preElectraSlot, 5)
|
||||
require.NoError(t, err)
|
||||
require.DeepEqual(t, expectedData, data1)
|
||||
|
||||
// Second call with different committee index 7 - should still call beacon node
|
||||
data2, err := v.getAttestationData(context.Background(), preElectraSlot, 7)
|
||||
require.NoError(t, err)
|
||||
require.DeepEqual(t, expectedData, data2)
|
||||
}
|
||||
|
||||
func TestGetAttestationData_PostElectraCaching(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
|
||||
// Set up Electra fork epoch for this test
|
||||
cfg := params.BeaconConfig().Copy()
|
||||
originalElectraForkEpoch := cfg.ElectraForkEpoch
|
||||
cfg.ElectraForkEpoch = 1
|
||||
params.OverrideBeaconConfig(cfg)
|
||||
defer func() {
|
||||
cfg.ElectraForkEpoch = originalElectraForkEpoch
|
||||
params.OverrideBeaconConfig(cfg)
|
||||
}()
|
||||
|
||||
client := validatormock.NewMockValidatorClient(ctrl)
|
||||
v := &validator{validatorClient: client}
|
||||
|
||||
// Post-Electra slot
|
||||
postElectraSlot := primitives.Slot(params.BeaconConfig().SlotsPerEpoch + 5)
|
||||
|
||||
expectedData := ðpb.AttestationData{
|
||||
Slot: postElectraSlot,
|
||||
CommitteeIndex: 0,
|
||||
BeaconBlockRoot: bytesutil.PadTo([]byte("root"), 32),
|
||||
Source: ðpb.Checkpoint{Epoch: 1, Root: bytesutil.PadTo([]byte("source"), 32)},
|
||||
Target: ðpb.Checkpoint{Epoch: 2, Root: bytesutil.PadTo([]byte("target"), 32)},
|
||||
}
|
||||
|
||||
// Only ONE call should go to the beacon node (caching post-Electra)
|
||||
client.EXPECT().AttestationData(gomock.Any(), ðpb.AttestationDataRequest{
|
||||
Slot: postElectraSlot,
|
||||
CommitteeIndex: 0,
|
||||
}).Return(expectedData, nil).Times(1)
|
||||
|
||||
// First call - should hit beacon node
|
||||
data1, err := v.getAttestationData(context.Background(), postElectraSlot, 5)
|
||||
require.NoError(t, err)
|
||||
require.DeepEqual(t, expectedData, data1)
|
||||
|
||||
// Second call with different committee index - should use cache
|
||||
data2, err := v.getAttestationData(context.Background(), postElectraSlot, 7)
|
||||
require.NoError(t, err)
|
||||
require.DeepEqual(t, expectedData, data2)
|
||||
|
||||
// Third call - should still use cache
|
||||
data3, err := v.getAttestationData(context.Background(), postElectraSlot, 10)
|
||||
require.NoError(t, err)
|
||||
require.DeepEqual(t, expectedData, data3)
|
||||
}
|
||||
|
||||
func TestGetAttestationData_PostElectraCacheInvalidatesOnNewSlot(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
|
||||
// Set up Electra fork epoch for this test
|
||||
cfg := params.BeaconConfig().Copy()
|
||||
originalElectraForkEpoch := cfg.ElectraForkEpoch
|
||||
cfg.ElectraForkEpoch = 1
|
||||
params.OverrideBeaconConfig(cfg)
|
||||
defer func() {
|
||||
cfg.ElectraForkEpoch = originalElectraForkEpoch
|
||||
params.OverrideBeaconConfig(cfg)
|
||||
}()
|
||||
|
||||
client := validatormock.NewMockValidatorClient(ctrl)
|
||||
v := &validator{validatorClient: client}
|
||||
|
||||
slot1 := primitives.Slot(params.BeaconConfig().SlotsPerEpoch + 5)
|
||||
slot2 := primitives.Slot(params.BeaconConfig().SlotsPerEpoch + 6)
|
||||
|
||||
dataSlot1 := ðpb.AttestationData{
|
||||
Slot: slot1,
|
||||
CommitteeIndex: 0,
|
||||
BeaconBlockRoot: bytesutil.PadTo([]byte("root1"), 32),
|
||||
Source: ðpb.Checkpoint{Epoch: 1, Root: bytesutil.PadTo([]byte("source"), 32)},
|
||||
Target: ðpb.Checkpoint{Epoch: 2, Root: bytesutil.PadTo([]byte("target"), 32)},
|
||||
}
|
||||
dataSlot2 := ðpb.AttestationData{
|
||||
Slot: slot2,
|
||||
CommitteeIndex: 0,
|
||||
BeaconBlockRoot: bytesutil.PadTo([]byte("root2"), 32),
|
||||
Source: ðpb.Checkpoint{Epoch: 1, Root: bytesutil.PadTo([]byte("source"), 32)},
|
||||
Target: ðpb.Checkpoint{Epoch: 2, Root: bytesutil.PadTo([]byte("target"), 32)},
|
||||
}
|
||||
|
||||
// Expect one call per slot
|
||||
client.EXPECT().AttestationData(gomock.Any(), ðpb.AttestationDataRequest{
|
||||
Slot: slot1,
|
||||
CommitteeIndex: 0,
|
||||
}).Return(dataSlot1, nil).Times(1)
|
||||
client.EXPECT().AttestationData(gomock.Any(), ðpb.AttestationDataRequest{
|
||||
Slot: slot2,
|
||||
CommitteeIndex: 0,
|
||||
}).Return(dataSlot2, nil).Times(1)
|
||||
|
||||
// First slot - should hit beacon node
|
||||
data1, err := v.getAttestationData(context.Background(), slot1, 5)
|
||||
require.NoError(t, err)
|
||||
require.DeepEqual(t, dataSlot1, data1)
|
||||
|
||||
// Same slot - should use cache
|
||||
data1Again, err := v.getAttestationData(context.Background(), slot1, 7)
|
||||
require.NoError(t, err)
|
||||
require.DeepEqual(t, dataSlot1, data1Again)
|
||||
|
||||
// New slot - should invalidate cache and hit beacon node
|
||||
data2, err := v.getAttestationData(context.Background(), slot2, 5)
|
||||
require.NoError(t, err)
|
||||
require.DeepEqual(t, dataSlot2, data2)
|
||||
}
|
||||
|
||||
func TestGetAttestationData_PostElectraConcurrentAccess(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
|
||||
// Set up Electra fork epoch for this test
|
||||
cfg := params.BeaconConfig().Copy()
|
||||
originalElectraForkEpoch := cfg.ElectraForkEpoch
|
||||
cfg.ElectraForkEpoch = 1
|
||||
params.OverrideBeaconConfig(cfg)
|
||||
defer func() {
|
||||
cfg.ElectraForkEpoch = originalElectraForkEpoch
|
||||
params.OverrideBeaconConfig(cfg)
|
||||
}()
|
||||
|
||||
client := validatormock.NewMockValidatorClient(ctrl)
|
||||
v := &validator{validatorClient: client}
|
||||
|
||||
postElectraSlot := primitives.Slot(params.BeaconConfig().SlotsPerEpoch + 5)
|
||||
|
||||
expectedData := ðpb.AttestationData{
|
||||
Slot: postElectraSlot,
|
||||
CommitteeIndex: 0,
|
||||
BeaconBlockRoot: bytesutil.PadTo([]byte("root"), 32),
|
||||
Source: ðpb.Checkpoint{Epoch: 1, Root: bytesutil.PadTo([]byte("source"), 32)},
|
||||
Target: ðpb.Checkpoint{Epoch: 2, Root: bytesutil.PadTo([]byte("target"), 32)},
|
||||
}
|
||||
|
||||
// Should only call beacon node once despite concurrent requests
|
||||
client.EXPECT().AttestationData(gomock.Any(), ðpb.AttestationDataRequest{
|
||||
Slot: postElectraSlot,
|
||||
CommitteeIndex: 0,
|
||||
}).Return(expectedData, nil).Times(1)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
numGoroutines := 10
|
||||
results := make([]*ethpb.AttestationData, numGoroutines)
|
||||
errs := make([]error, numGoroutines)
|
||||
|
||||
for i := range numGoroutines {
|
||||
wg.Add(1)
|
||||
go func(idx int) {
|
||||
defer wg.Done()
|
||||
results[idx], errs[idx] = v.getAttestationData(context.Background(), postElectraSlot, primitives.CommitteeIndex(idx))
|
||||
}(i)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
for i := range numGoroutines {
|
||||
require.NoError(t, errs[i])
|
||||
require.DeepEqual(t, expectedData, results[i])
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user